@ -106,21 +106,27 @@ bool FreeList::Overlap(const Blob &blob) const {
BlobStore : : BlobStore ( const string & directory ,
uint64_t block_size ,
uint32_t blocks_per_bucket ,
uint32_t max_buckets ,
Env * env ) :
directory_ ( directory ) ,
block_size_ ( block_size ) ,
blocks_per_bucket_ ( blocks_per_bucket ) ,
env_ ( env ) {
env_ ( env ) ,
max_buckets_ ( max_buckets ) {
env_ - > CreateDirIfMissing ( directory_ ) ;
storage_options_ . use_mmap_writes = false ;
storage_options_ . use_mmap_reads = false ;
buckets_size_ = 0 ;
buckets_ = new unique_ptr < RandomRWFile > [ max_buckets_ ] ;
CreateNewBucket ( ) ;
}
BlobStore : : ~ BlobStore ( ) {
// TODO we don't care about recovery for now
delete [ ] buckets_ ;
}
Status BlobStore : : Put ( const Slice & value , Blob * blob ) {
@ -129,13 +135,12 @@ Status BlobStore::Put(const Slice& value, Blob* blob) {
if ( ! s . ok ( ) ) {
return s ;
}
ReadLock l ( & buckets_mutex_ ) ;
size_t size_left = value . size ( ) ;
uint64_t offset = 0 ; // in bytes, not blocks
for ( auto chunk : blob - > chunks ) {
uint64_t write_size = min ( chunk . size * block_size_ , size_left ) ;
assert ( chunk . bucket_id < buckets_ . size ( ) ) ;
assert ( chunk . bucket_id < buckets_size_ ) ;
s = buckets_ [ chunk . bucket_id ] . get ( ) - > Write ( chunk . offset * block_size_ ,
Slice ( value . data ( ) + offset ,
write_size ) ) ;
@ -164,18 +169,19 @@ Status BlobStore::Put(const Slice& value, Blob* blob) {
Status BlobStore : : Get ( const Blob & blob ,
string * value ) const {
ReadLock l ( & buckets_mutex_ ) ;
// assert that it doesn't overlap with free list
// it will get compiled out for release
assert ( ! free_list_ . Overlap ( blob ) ) ;
{
// assert that it doesn't overlap with free list
// it will get compiled out for release
MutexLock l ( & free_list_mutex_ ) ;
assert ( ! free_list_ . Overlap ( blob ) ) ;
}
value - > resize ( blob . Size ( ) * block_size_ ) ;
uint64_t offset = 0 ; // in bytes, not blocks
for ( auto chunk : blob . chunks ) {
Slice result ;
assert ( chunk . bucket_id < buckets_ . size ( ) ) ;
assert ( chunk . bucket_id < buckets_size_ ) ;
Status s ;
s = buckets_ [ chunk . bucket_id ] . get ( ) - > Read ( chunk . offset * block_size_ ,
chunk . size * block_size_ ,
@ -200,8 +206,7 @@ Status BlobStore::Delete(const Blob& blob) {
}
Status BlobStore : : Sync ( ) {
ReadLock l ( & buckets_mutex_ ) ;
for ( size_t i = 0 ; i < buckets_ . size ( ) ; + + i ) {
for ( size_t i = 0 ; i < buckets_size_ ; + + i ) {
Status s = buckets_ [ i ] . get ( ) - > Sync ( ) ;
if ( ! s . ok ( ) ) {
return s ;
@ -228,10 +233,13 @@ Status BlobStore::Allocate(uint32_t blocks, Blob* blob) {
// called with free_list_mutex_ held
Status BlobStore : : CreateNewBucket ( ) {
WriteLock l ( & buckets_mutex_ ) ;
int new_bucket_id ;
new_bucket_id = buckets_ . size ( ) ;
buckets_ . push_back ( unique_ptr < RandomRWFile > ( ) ) ;
MutexLock l ( & buckets_mutex_ ) ;
if ( buckets_size_ > = max_buckets_ ) {
return Status : : IOError ( " Max size exceeded \n " ) ;
}
int new_bucket_id = buckets_size_ ;
char fname [ 200 ] ;
sprintf ( fname , " %s/%d.bs " , directory_ . c_str ( ) , new_bucket_id ) ;
@ -240,7 +248,6 @@ Status BlobStore::CreateNewBucket() {
& buckets_ [ new_bucket_id ] ,
storage_options_ ) ;
if ( ! s . ok ( ) ) {
buckets_ . erase ( buckets_ . begin ( ) + new_bucket_id ) ;
return s ;
}
@ -249,6 +256,8 @@ Status BlobStore::CreateNewBucket() {
// (also, tmpfs does not support allocate)
buckets_ [ new_bucket_id ] . get ( ) - > Allocate ( 0 , block_size_ * blocks_per_bucket_ ) ;
buckets_size_ = new_bucket_id + 1 ;
return free_list_ . Free ( Blob ( new_bucket_id , 0 , blocks_per_bucket_ ) ) ;
}