diff --git a/Makefile b/Makefile index e3a42bed9..d0ca51aac 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,7 @@ TESTS = \ db_test \ dbformat_test \ env_test \ + blob_store_test \ filelock_test \ filename_test \ filter_block_test \ @@ -204,6 +205,9 @@ cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS) coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +blob_store_test: util/blob_store_test.o $(LIBOBJECTS) $(TESTHARNESS) $(TESTUTIL) + $(CXX) util/blob_store_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o$@ $(LDFLAGS) $(COVERAGEFLAGS) + stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/util/blob_store.cc b/util/blob_store.cc new file mode 100644 index 000000000..694787210 --- /dev/null +++ b/util/blob_store.cc @@ -0,0 +1,297 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/blob_store.h" + +namespace rocksdb { + +using namespace std; + +// BlobChunk +bool BlobChunk::ImmediatelyBefore(const BlobChunk& chunk) const { + // overlapping!? + assert(!Overlap(chunk)); + // size == 0 is a marker, not a block + return size != 0 && + bucket_id == chunk.bucket_id && + offset + size == chunk.offset; +} + +bool BlobChunk::Overlap(const BlobChunk &chunk) const { + return size != 0 && chunk.size != 0 && bucket_id == chunk.bucket_id && + ((offset >= chunk.offset && offset < chunk.offset + chunk.size) || + (chunk.offset >= offset && chunk.offset < offset + size)); +} + +// Blob +string Blob::ToString() const { + string ret; + for (auto chunk : chunks) { + PutFixed32(&ret, chunk.bucket_id); + PutFixed32(&ret, chunk.offset); + PutFixed32(&ret, chunk.size); + } + return ret; +} + +Blob::Blob(const std::string& blob) { + for (uint32_t i = 0; i < blob.size(); ) { + uint32_t t[3] = {0}; + for (int j = 0; j < 3 && i + sizeof(uint32_t) - 1 < blob.size(); + ++j, i += sizeof(uint32_t)) { + t[j] = DecodeFixed32(blob.data() + i); + } + chunks.push_back(BlobChunk(t[0], t[1], t[2])); + } +} + +// FreeList +FreeList::FreeList() { + // We add (0, 0, 0) blob because it makes our life easier and + // code cleaner. (0, 0, 0) is always in the list so we can + // guarantee that free_chunks_list_ != nullptr, which avoids + // lots of unnecessary ifs + free_chunks_list_ = (FreeChunk *)malloc(sizeof(FreeChunk)); + free_chunks_list_->chunk = BlobChunk(0, 0, 0); + free_chunks_list_->next = nullptr; +} + +FreeList::~FreeList() { + while (free_chunks_list_ != nullptr) { + FreeChunk* t = free_chunks_list_; + free_chunks_list_ = free_chunks_list_->next; + free(t); + } +} + +Status FreeList::Free(const Blob& blob) { + MutexLock l(&mutex_); + + // add it back to the free list + for (auto chunk : blob.chunks) { + FreeChunk* itr = free_chunks_list_; + + // find a node AFTER which we'll add the block + for ( ; itr->next != nullptr && itr->next->chunk <= chunk; + itr = itr->next) { + } + + // try to merge with previous block + if (itr->chunk.ImmediatelyBefore(chunk)) { + // merge + itr->chunk.size += chunk.size; + } else { + // Insert the block after itr + FreeChunk* t = (FreeChunk*)malloc(sizeof(FreeChunk)); + if (t == nullptr) { + throw runtime_error("Malloc failed"); + } + t->chunk = chunk; + t->next = itr->next; + itr->next = t; + itr = t; + } + + // try to merge with the next block + if (itr->next != nullptr && + itr->chunk.ImmediatelyBefore(itr->next->chunk)) { + FreeChunk *tobedeleted = itr->next; + itr->chunk.size += itr->next->chunk.size; + itr->next = itr->next->next; + free(tobedeleted); + } + } + + return Status::OK(); +} + +Status FreeList::Allocate(uint32_t blocks, Blob* blob) { + MutexLock l(&mutex_); + FreeChunk** best_fit_node = nullptr; + + // Find the smallest free chunk whose size is greater or equal to blocks + for (FreeChunk** itr = &free_chunks_list_; (*itr) != nullptr; + itr = &((*itr)->next)) { + if ((*itr)->chunk.size >= blocks && + (best_fit_node == nullptr || + (*best_fit_node)->chunk.size > (*itr)->chunk.size)) { + best_fit_node = itr; + } + } + + if (best_fit_node == nullptr || *best_fit_node == nullptr) { + // Not enough memory + return Status::Incomplete(""); + } + + blob->SetOneChunk((*best_fit_node)->chunk.bucket_id, + (*best_fit_node)->chunk.offset, + blocks); + + if ((*best_fit_node)->chunk.size > blocks) { + // just shorten best_fit_node + (*best_fit_node)->chunk.offset += blocks; + (*best_fit_node)->chunk.size -= blocks; + } else { + assert(blocks == (*best_fit_node)->chunk.size); + // delete best_fit_node + FreeChunk* t = *best_fit_node; + (*best_fit_node) = (*best_fit_node)->next; + free(t); + } + + return Status::OK(); +} + +bool FreeList::Overlap(const Blob &blob) const { + MutexLock l(&mutex_); + for (auto chunk : blob.chunks) { + for (auto itr = free_chunks_list_; itr != nullptr; itr = itr->next) { + if (itr->chunk.Overlap(chunk)) { + return true; + } + } + } + return false; +} + +// BlobStore +BlobStore::BlobStore(const string& directory, + uint64_t block_size, + uint32_t blocks_per_bucket, + Env* env) : + directory_(directory), + block_size_(block_size), + blocks_per_bucket_(blocks_per_bucket), + env_(env) { + env_->CreateDirIfMissing(directory_); + + storage_options_.use_mmap_writes = false; + storage_options_.use_mmap_reads = false; + + CreateNewBucket(); +} + +BlobStore::~BlobStore() { + // TODO we don't care about recovery for now +} + +Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) { + // convert size to number of blocks + Status s = Allocate((size + block_size_ - 1) / block_size_, blob); + if (!s.ok()) { + return s; + } + + uint64_t offset = 0; // in bytes, not blocks + for (auto chunk : blob->chunks) { + uint64_t write_size = min(chunk.size * block_size_, size); + assert(chunk.bucket_id < buckets_.size()); + s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_, + Slice(value + offset, + write_size)); + if (!s.ok()) { + Delete(*blob); + return s; + } + offset += write_size; + size -= write_size; + if (write_size < chunk.size * block_size_) { + // if we have any space left in the block, fill it up with zeros + string zero_string(chunk.size * block_size_ - write_size, 0); + s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_ + + write_size, + Slice(zero_string)); + } + } + + if (size > 0) { + Delete(*blob); + return Status::IOError("Tried to write more data than fits in the blob"); + } + + return Status::OK(); +} + +Status BlobStore::Get(const Blob& blob, + string* value) const { + // assert that it doesn't overlap with free list + // it will get compiled out for release + assert(!free_list_.Overlap(blob)); + + uint32_t total_size = 0; // in blocks + for (auto chunk : blob.chunks) { + total_size += chunk.size; + } + assert(total_size > 0); + value->resize(total_size * block_size_); + + uint64_t offset = 0; // in bytes, not blocks + for (auto chunk : blob.chunks) { + Slice result; + assert(chunk.bucket_id < buckets_.size()); + Status s; + s = buckets_[chunk.bucket_id].get()->Read(chunk.offset * block_size_, + chunk.size * block_size_, + &result, + &value->at(offset)); + if (!s.ok() || result.size() < chunk.size * block_size_) { + value->clear(); + return Status::IOError("Could not read in from file"); + } + offset += chunk.size * block_size_; + } + + // remove the '\0's at the end of the string + value->erase(find(value->begin(), value->end(), '\0'), value->end()); + + return Status::OK(); +} + +Status BlobStore::Delete(const Blob& blob) { + return free_list_.Free(blob); +} + +Status BlobStore::Allocate(uint32_t blocks, Blob* blob) { + // TODO we don't currently support fragmented blobs + MutexLock l(&allocate_mutex_); + assert(blocks <= blocks_per_bucket_); + Status s; + + s = free_list_.Allocate(blocks, blob); + if (!s.ok()) { + CreateNewBucket(); + s = free_list_.Allocate(blocks, blob); + } + + return s; +} + +Status BlobStore::CreateNewBucket() { + int new_bucket_id; + new_bucket_id = buckets_.size(); + buckets_.push_back(unique_ptr()); + + char fname[200]; + sprintf(fname, "%s/%d.bs", directory_.c_str(), new_bucket_id); + + Status s = env_->NewRandomRWFile(string(fname), + &buckets_[new_bucket_id], + storage_options_); + if (!s.ok()) { + buckets_.erase(buckets_.begin() + new_bucket_id); + return s; + } + + s = buckets_[new_bucket_id].get()->Allocate( + 0, block_size_ * blocks_per_bucket_); + if (!s.ok()) { + buckets_.erase(buckets_.begin() + new_bucket_id); + return s; + } + + return free_list_.Free(Blob(new_bucket_id, 0, blocks_per_bucket_)); +} + +} // namespace rocksdb diff --git a/util/blob_store.h b/util/blob_store.h new file mode 100644 index 000000000..5d505f66d --- /dev/null +++ b/util/blob_store.h @@ -0,0 +1,151 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "port/port.h" +#include "util/mutexlock.h" +#include "util/coding.h" + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +struct BlobChunk { + uint32_t bucket_id; + uint32_t offset; // in blocks + uint32_t size; // in blocks + BlobChunk() {} + BlobChunk(uint32_t bucket_id, uint32_t offset, uint32_t size) : + bucket_id(bucket_id), offset(offset), size(size) {} + + bool operator <= (const BlobChunk& chunk) const { + if (bucket_id != chunk.bucket_id) { + return bucket_id < chunk.bucket_id; + } + if (offset != chunk.offset) { + return offset < chunk.offset; + } + return true; + } + + // returns true if it's immediately before chunk + bool ImmediatelyBefore(const BlobChunk& chunk) const; + // returns true if chunks overlap + bool Overlap(const BlobChunk &chunk) const; +}; + +// We represent each Blob as a string in format: +// bucket_id offset size|bucket_id offset size... +// The string can be used to reference the Blob stored on external +// device/file +// Not thread-safe! +struct Blob { + // Generates the string + std::string ToString() const; + // Parses the previously generated string + explicit Blob(const std::string& blob); + // Creates unfragmented Blob + Blob(uint32_t bucket_id, uint32_t offset, uint32_t size) { + SetOneChunk(bucket_id, offset, size); + } + Blob() {} + + void SetOneChunk(uint32_t bucket_id, uint32_t offset, uint32_t size) { + chunks.clear(); + chunks.push_back(BlobChunk(bucket_id, offset, size)); + } + + // bucket_id, offset, size + std::vector chunks; +}; + +// Keeps a list of free chunks +class FreeList { + public: + FreeList (); + ~FreeList(); + + // Allocates a a blob. Stores the allocated blob in + // 'blob'. Returns non-OK status if it failed to allocate. + // Thread-safe + Status Allocate(uint32_t blocks, Blob* blob); + // Frees the blob for reuse. Thread-safe + Status Free(const Blob& blob); + + // returns true if blob is overlapping with any of the + // chunks stored in free list + bool Overlap(const Blob &blob) const; + + private: + struct FreeChunk { + BlobChunk chunk; + FreeChunk* next; + }; + + FreeChunk* free_chunks_list_; + mutable port::Mutex mutex_; +}; + +// thread-safe +class BlobStore { + public: + // directory - wherever the blobs should be stored. It will be created + // if missing + // block_size - self explanatory + // blocks_per_bucket - how many blocks we want to keep in one bucket. + // Bucket is a device or a file that we use to store the blobs. + // If we don't have enough blocks to allocate a new blob, we will + // try to create a new file or device. + // env - env for creating new files + BlobStore(const std::string& directory, + uint64_t block_size, + uint32_t blocks_per_bucket, + Env* env); + ~BlobStore(); + + // Allocates space for size bytes (rounded up to be multiple of + // block size) and writes size bytes from value to a backing store. + // Sets Blob blob that can than be used for addressing the + // stored value. Returns non-OK status on error. + Status Put(const char* value, uint64_t size, Blob* blob); + // Value needs to have enough space to store all the loaded stuff. + // This function is thread safe! + Status Get(const Blob& blob, std::string* value) const; + // Frees the blob for reuse, but does not delete the data + // on the backing store. + Status Delete(const Blob& blob); + + private: + const std::string directory_; + // block_size_ is uint64_t because when we multiply with + // blocks_size_ we want the result to be uint64_t or + // we risk overflowing + const uint64_t block_size_; + const uint32_t blocks_per_bucket_; + Env* env_; + EnvOptions storage_options_; + FreeList free_list_; + // protected by buckets mutex + std::vector> buckets_; + mutable port::Mutex allocate_mutex_; + + // Calls FreeList allocate. If free list can't allocate + // new blob, creates new bucket and tries again + // Thread-safe + Status Allocate(uint32_t blocks, Blob* blob); + + // Creates a new backing store and adds all the blocks + // from the new backing store to the free list + // NOT thread-safe, call with lock held + Status CreateNewBucket(); +}; + +} // namespace rocksdb diff --git a/util/blob_store_test.cc b/util/blob_store_test.cc new file mode 100644 index 000000000..c8a8e8650 --- /dev/null +++ b/util/blob_store_test.cc @@ -0,0 +1,133 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/blob_store.h" + +#include "util/testharness.h" +#include "util/testutil.h" +#include "util/random.h" + +#include +#include + +namespace rocksdb { + +using namespace std; + +class BlobStoreTest { }; + +TEST(BlobStoreTest, RangeParseTest) { + Blob e; + for (int i = 0; i < 5; ++i) { + e.chunks.push_back(BlobChunk(rand(), rand(), rand())); + } + string x = e.ToString(); + Blob nx(x); + + ASSERT_EQ(nx.ToString(), x); +} + +// make sure we're reusing the freed space +TEST(BlobStoreTest, SanityTest) { + const uint64_t block_size = 10; + const uint32_t blocks_per_file = 20; + Random random(5); + + BlobStore blob_store(test::TmpDir() + "/blob_store_test", + block_size, + blocks_per_file, + Env::Default()); + + string buf; + + // put string of size 170 + test::RandomString(&random, 170, &buf); + Blob r1; + ASSERT_OK(blob_store.Put(buf.data(), 170, &r1)); + // use the first file + for (size_t i = 0; i < r1.chunks.size(); ++i) { + ASSERT_EQ(r1.chunks[0].bucket_id, 0u); + } + + // put string of size 30 + test::RandomString(&random, 30, &buf); + Blob r2; + ASSERT_OK(blob_store.Put(buf.data(), 30, &r2)); + // use the first file + for (size_t i = 0; i < r2.chunks.size(); ++i) { + ASSERT_EQ(r2.chunks[0].bucket_id, 0u); + } + + // delete blob of size 170 + ASSERT_OK(blob_store.Delete(r1)); + + // put a string of size 100 + test::RandomString(&random, 100, &buf); + Blob r3; + ASSERT_OK(blob_store.Put(buf.data(), 100, &r3)); + // use the first file + for (size_t i = 0; i < r3.chunks.size(); ++i) { + ASSERT_EQ(r3.chunks[0].bucket_id, 0u); + } + + // put a string of size 70 + test::RandomString(&random, 70, &buf); + Blob r4; + ASSERT_OK(blob_store.Put(buf.data(), 70, &r4)); + // use the first file + for (size_t i = 0; i < r4.chunks.size(); ++i) { + ASSERT_EQ(r4.chunks[0].bucket_id, 0u); + } + + // put a string of size 5 + test::RandomString(&random, 5, &buf); + Blob r5; + ASSERT_OK(blob_store.Put(buf.data(), 70, &r5)); + // now you get to use the second file + for (size_t i = 0; i < r5.chunks.size(); ++i) { + ASSERT_EQ(r5.chunks[0].bucket_id, 1u); + } + +} + +TEST(BlobStoreTest, CreateAndStoreTest) { + const uint64_t block_size = 10; + const uint32_t blocks_per_file = 1000; + const int max_blurb_size = 300; + Random random(5); + + BlobStore blob_store(test::TmpDir() + "/blob_store_test", + block_size, + blocks_per_file, + Env::Default()); + vector> ranges; + + for (int i = 0; i < 20000; ++i) { + int decision = rand() % 5; + if (decision <= 2 || ranges.size() == 0) { + string buf; + int size_blocks = (rand() % max_blurb_size + 1); + int string_size = size_blocks * block_size - (rand() % block_size); + test::RandomString(&random, string_size, &buf); + Blob r; + ASSERT_OK(blob_store.Put(buf.data(), string_size, &r)); + ranges.push_back(make_pair(r, buf)); + } else if (decision == 3) { + int ti = rand() % ranges.size(); + string out_buf; + ASSERT_OK(blob_store.Get(ranges[ti].first, &out_buf)); + ASSERT_EQ(ranges[ti].second, out_buf); + } else { + int ti = rand() % ranges.size(); + ASSERT_OK(blob_store.Delete(ranges[ti].first)); + ranges.erase(ranges.begin() + ti); + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + return rocksdb::test::RunAllTests(); +}