@ -16,6 +16,8 @@
# include <unordered_set>
# include "rocksdb/cache.h"
# include "rocksdb/options.h"
# include "rocksdb/slice_transform.h"
# include "rocksdb/table.h"
# include "rocksdb/db.h"
# include "rocksdb/utilities/stackable_db.h"
@ -244,13 +246,76 @@ std::string FeatureSet::DebugString() const {
return out + " } " ;
}
class ValueGetter {
public :
ValueGetter ( ) { }
virtual ~ ValueGetter ( ) { }
virtual bool Get ( uint64_t id ) = 0 ;
virtual const Slice value ( ) const = 0 ;
virtual Status status ( ) const = 0 ;
} ;
class ValueGetterFromDB : public ValueGetter {
public :
ValueGetterFromDB ( DB * db , ColumnFamilyHandle * cf ) : db_ ( db ) , cf_ ( cf ) { }
virtual bool Get ( uint64_t id ) override {
std : : string encoded_id ;
PutFixed64BigEndian ( & encoded_id , id ) ;
status_ = db_ - > Get ( ReadOptions ( ) , cf_ , encoded_id , & value_ ) ;
if ( status_ . IsNotFound ( ) ) {
status_ = Status : : Corruption ( " Index inconsistency " ) ;
return false ;
}
return true ;
}
virtual const Slice value ( ) const override { return value_ ; }
virtual Status status ( ) const override { return status_ ; }
private :
std : : string value_ ;
DB * db_ ;
ColumnFamilyHandle * cf_ ;
Status status_ ;
} ;
class ValueGetterFromIterator : public ValueGetter {
public :
explicit ValueGetterFromIterator ( Iterator * iterator ) : iterator_ ( iterator ) { }
virtual bool Get ( uint64_t id ) override {
std : : string encoded_id ;
PutFixed64BigEndian ( & encoded_id , id ) ;
iterator_ - > Seek ( encoded_id ) ;
if ( ! iterator_ - > Valid ( ) | | iterator_ - > key ( ) ! = Slice ( encoded_id ) ) {
status_ = Status : : Corruption ( " Index inconsistency " ) ;
return false ;
}
return true ;
}
virtual const Slice value ( ) const override { return iterator_ - > value ( ) ; }
virtual Status status ( ) const override { return status_ ; }
private :
std : : unique_ptr < Iterator > iterator_ ;
Status status_ ;
} ;
class SpatialIndexCursor : public Cursor {
public :
// tile_box is inclusive
SpatialIndexCursor ( Iterator * spatial_iterator , Iterator * data_iterator ,
SpatialIndexCursor ( Iterator * spatial_iterator , ValueGetter * value_gette r,
const BoundingBox < uint64_t > & tile_bbox , uint32_t tile_bits )
: data_iterator_ ( data_iterator ) ,
valid_ ( true ) {
: value_getter_ ( value_getter ) , valid_ ( true ) {
// calculate quad keys we'll need to query
std : : vector < uint64_t > quad_keys ;
quad_keys . reserve ( ( tile_bbox . max_x - tile_bbox . min_x + 1 ) *
@ -329,7 +394,7 @@ class SpatialIndexCursor : public Cursor {
if ( ! status_ . ok ( ) ) {
return status_ ;
}
return data_iterato r_- > status ( ) ;
return value_gette r_- > status ( ) ;
}
private :
@ -356,32 +421,23 @@ class SpatialIndexCursor : public Cursor {
return true ;
}
// doesn't return anything, but sets valid_ and status_ on corruption
void ExtractData ( ) {
assert ( valid_ ) ;
std : : string encoded_id ;
PutFixed64BigEndian ( & encoded_id , * primary_keys_iterator_ ) ;
valid_ = value_getter_ - > Get ( * primary_keys_iterator_ ) ;
data_iterator_ - > Seek ( encoded_id ) ;
if ( ! data_iterator_ - > Valid ( ) | |
data_iterator_ - > key ( ) ! = Slice ( encoded_id ) ) {
status_ = Status : : Corruption ( " Index inconsistency " ) ;
valid_ = false ;
return ;
if ( valid_ ) {
Slice data = value_getter_ - > value ( ) ;
current_feature_set_ . Clear ( ) ;
if ( ! GetLengthPrefixedSlice ( & data , & current_blob_ ) | |
! current_feature_set_ . Deserialize ( data ) ) {
status_ = Status : : Corruption ( " Primary key column family corruption " ) ;
valid_ = false ;
}
}
Slice data = data_iterator_ - > value ( ) ;
current_feature_set_ . Clear ( ) ;
if ( ! GetLengthPrefixedSlice ( & data , & current_blob_ ) | |
! current_feature_set_ . Deserialize ( data ) ) {
status_ = Status : : Corruption ( " Primary key column family corruption " ) ;
valid_ = false ;
return ;
}
}
unique_ptr < Iterator > data_iterato r_;
unique_ptr < ValueGetter > value_getter_ ;
bool valid_ ;
Status status_ ;
@ -427,10 +483,11 @@ class SpatialDBImpl : public SpatialDB {
DB * db , ColumnFamilyHandle * data_column_family ,
const std : : vector < std : : pair < SpatialIndexOptions , ColumnFamilyHandle * > > &
spatial_indexes ,
uint64_t next_id )
uint64_t next_id , bool read_only )
: SpatialDB ( db ) ,
data_column_family_ ( data_column_family ) ,
next_id_ ( next_id ) {
next_id_ ( next_id ) ,
read_only_ ( read_only ) {
for ( const auto & index : spatial_indexes ) {
name_to_index_ . insert (
{ index . first . name , IndexColumnFamily ( index . first , index . second ) } ) ;
@ -521,17 +578,26 @@ class SpatialDBImpl : public SpatialDB {
return new ErrorCursor ( Status : : InvalidArgument (
" Spatial index " + spatial_index + " not found " ) ) ;
}
const auto & si = itr - > second . index ;
Iterator * spatial_iterator ;
ValueGetter * value_getter ;
std : : vector < Iterator * > iterators ;
Status s = NewIterators ( read_options ,
{ data_column_family_ , itr - > second . column_family } ,
& iterators ) ;
if ( ! s . ok ( ) ) {
return new ErrorCursor ( s ) ;
}
if ( read_only_ ) {
spatial_iterator = NewIterator ( read_options , itr - > second . column_family ) ;
value_getter = new ValueGetterFromDB ( this , data_column_family_ ) ;
} else {
std : : vector < Iterator * > iterators ;
Status s = NewIterators ( read_options ,
{ data_column_family_ , itr - > second . column_family } ,
& iterators ) ;
if ( ! s . ok ( ) ) {
return new ErrorCursor ( s ) ;
}
const auto & si = itr - > second . index ;
return new SpatialIndexCursor ( iterators [ 1 ] , iterators [ 0 ] ,
spatial_iterator = iterators [ 1 ] ;
value_getter = new ValueGetterFromIterator ( iterators [ 0 ] ) ;
}
return new SpatialIndexCursor ( spatial_iterator , value_getter ,
GetTileBoundingBox ( si , bbox ) , si . tile_bits ) ;
}
@ -548,31 +614,61 @@ class SpatialDBImpl : public SpatialDB {
std : : unordered_map < std : : string , IndexColumnFamily > name_to_index_ ;
std : : atomic < uint64_t > next_id_ ;
bool read_only_ ;
} ;
namespace {
Options GetRocksDBOptionsFromOptions ( const SpatialDBOptions & options ) {
Options rocksdb_options ;
rocksdb_options . IncreaseParallelism ( options . num_threads ) ;
rocksdb_options . write_buffer_size = 256 * 1024 * 1024 ; // 256MB
rocksdb_options . max_bytes_for_level_base = 1024 * 1024 * 1024 ; // 1 GB
DBOptions GetDBOptions ( const SpatialDBOptions & options ) {
DBOptions db_options ;
db_options . IncreaseParallelism ( options . num_threads ) ;
if ( options . bulk_load ) {
db_options . disableDataSync = true ;
}
return db_options ;
}
ColumnFamilyOptions GetColumnFamilyOptions ( const SpatialDBOptions & options ,
std : : shared_ptr < Cache > block_cache ) {
ColumnFamilyOptions column_family_options ;
column_family_options . write_buffer_size = 256 * 1024 * 1024 ; // 256MB
column_family_options . max_bytes_for_level_base = 1024 * 1024 * 1024 ; // 1 GB
// only compress levels >= 1
rocksdb_options . compression_per_level . resize ( rocksdb_options . num_levels ) ;
for ( int i = 0 ; i < rocksdb_options . num_levels ; + + i ) {
column_family_options . compression_per_level . resize (
column_family_options . num_levels ) ;
for ( int i = 0 ; i < column_family_options . num_levels ; + + i ) {
if ( i = = 0 ) {
rocksdb_options . compression_per_level [ i ] = kNoCompression ;
column_family _options. compression_per_level [ i ] = kNoCompression ;
} else {
rocksdb_options . compression_per_level [ i ] = kLZ4Compression ;
column_family _options. compression_per_level [ i ] = kLZ4Compression ;
}
}
BlockBasedTableOptions table_options ;
table_options . block_cache = NewLRUCache ( options . cache_size ) ;
rocksdb_options . table_factory . reset ( NewBlockBasedTableFactory ( table_options ) ) ;
table_options . block_cache = block_cache ;
column_family_options . table_factory . reset (
NewBlockBasedTableFactory ( table_options ) ) ;
if ( options . bulk_load ) {
rocksdb_options . PrepareForBulkLoad ( ) ;
}
return rocksdb_options ;
column_family_options . level0_file_num_compaction_trigger = ( 1 < < 30 ) ;
column_family_options . level0_slowdown_writes_trigger = ( 1 < < 30 ) ;
column_family_options . level0_stop_writes_trigger = ( 1 < < 30 ) ;
column_family_options . disable_auto_compactions = true ;
column_family_options . source_compaction_factor = ( 1 < < 30 ) ;
column_family_options . num_levels = 2 ;
column_family_options . target_file_size_base = 256 * 1024 * 1024 ;
column_family_options . max_mem_compaction_level = 0 ;
}
return column_family_options ;
}
ColumnFamilyOptions OptimizeOptionsForDataColumnFamily (
ColumnFamilyOptions options , std : : shared_ptr < Cache > block_cache ) {
options . prefix_extractor . reset ( NewNoopTransform ( ) ) ;
BlockBasedTableOptions block_based_options ;
block_based_options . index_type = BlockBasedTableOptions : : kHashSearch ;
block_based_options . block_cache = block_cache ;
options . table_factory . reset ( NewBlockBasedTableFactory ( block_based_options ) ) ;
return options ;
}
} // namespace
class MetadataStorage {
@ -618,26 +714,30 @@ class MetadataStorage {
Status SpatialDB : : Create (
const SpatialDBOptions & options , const std : : string & name ,
const std : : vector < SpatialIndexOptions > & spatial_indexes ) {
Options rocksdb_options = GetRocksDBOptionsFromOptions ( options ) ;
rocksdb_options . create_if_missing = true ;
rocksdb_options . create_missing_column_families = true ;
rocksdb_options . error_if_exists = true ;
DBOptions db_options = GetDBOptions ( options ) ;
db_options . create_if_missing = true ;
db_options . create_missing_column_families = true ;
db_options . error_if_exists = true ;
auto block_cache = NewLRUCache ( options . cache_size ) ;
ColumnFamilyOptions column_family_options =
GetColumnFamilyOptions ( options , block_cache ) ;
std : : vector < ColumnFamilyDescriptor > column_families ;
column_families . push_back ( ColumnFamilyDescriptor (
kDefaultColumnFamilyName , ColumnFamilyOptions ( rocksdb_options ) ) ) ;
column_families . push_back ( ColumnFamilyDescriptor (
kMetadataColumnFamilyName , ColumnFamilyOptions ( rocksdb_options ) ) ) ;
kDefaultColumnFamilyName ,
OptimizeOptionsForDataColumnFamily ( column_family_options , block_cache ) ) ) ;
column_families . push_back (
ColumnFamilyDescriptor ( kMetadataColumnFamilyName , column_family_options ) ) ;
for ( const auto & index : spatial_indexes ) {
column_families . emplace_back ( GetSpatialIndexColumnFamilyName ( index . name ) ,
ColumnFamilyOptions ( rocksdb_options ) ) ;
column_family_options ) ;
}
std : : vector < ColumnFamilyHandle * > handles ;
DB * base_db ;
Status s = DB : : Open ( DBOptions ( rocksdb_options ) , name , column_families ,
& handles , & base_db ) ;
Status s = DB : : Open ( db_options , name , column_families , & handles , & base_db ) ;
if ( ! s . ok ( ) ) {
return s ;
}
@ -659,13 +759,15 @@ Status SpatialDB::Create(
Status SpatialDB : : Open ( const SpatialDBOptions & options , const std : : string & name ,
SpatialDB * * db , bool read_only ) {
Options rocksdb_options = GetRocksDBOptionsFromOptions ( options ) ;
DBOptions db_options = GetDBOptions ( options ) ;
auto block_cache = NewLRUCache ( options . cache_size ) ;
ColumnFamilyOptions column_family_options =
GetColumnFamilyOptions ( options , block_cache ) ;
Status s ;
std : : vector < std : : string > existing_column_families ;
std : : vector < std : : string > spatial_indexes ;
s = DB : : ListColumnFamilies ( DBOptions ( rocksdb_options ) , name ,
& existing_column_families ) ;
s = DB : : ListColumnFamilies ( db_options , name , & existing_column_families ) ;
if ( ! s . ok ( ) ) {
return s ;
}
@ -678,22 +780,22 @@ Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name,
std : : vector < ColumnFamilyDescriptor > column_families ;
column_families . push_back ( ColumnFamilyDescriptor (
kDefaultColumnFamilyName , ColumnFamilyOptions ( rocksdb_options ) ) ) ;
column_families . push_back ( ColumnFamilyDescriptor (
kMetadataColumnFamilyName , ColumnFamilyOptions ( rocksdb_options ) ) ) ;
kDefaultColumnFamilyName ,
OptimizeOptionsForDataColumnFamily ( column_family_options , block_cache ) ) ) ;
column_families . push_back (
ColumnFamilyDescriptor ( kMetadataColumnFamilyName , column_family_options ) ) ;
for ( const auto & index : spatial_indexes ) {
column_families . emplace_back ( GetSpatialIndexColumnFamilyName ( index ) ,
ColumnFamilyOptions ( rocksdb_options ) ) ;
column_family_options ) ;
}
std : : vector < ColumnFamilyHandle * > handles ;
DB * base_db ;
if ( read_only ) {
s = DB : : OpenForReadOnly ( DBOptions ( rocks db_options) , name , column_families ,
& handles , & base_db ) ;
s = DB : : OpenForReadOnly ( db_options , name , column_families , & handl es ,
& base_db ) ;
} else {
s = DB : : Open ( DBOptions ( rocksdb_options ) , name , column_families , & handles ,
& base_db ) ;
s = DB : : Open ( db_options , name , column_families , & handles , & base_db ) ;
}
if ( ! s . ok ( ) ) {
return s ;
@ -730,13 +832,13 @@ Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name,
for ( auto h : handles ) {
delete h ;
}
delete db ;
delete base_ db;
return s ;
}
// I don't need metadata column family any more, so delete it
delete handles [ 1 ] ;
* db = new SpatialDBImpl ( base_db , handles [ 0 ] , index_cf , next_id ) ;
* db = new SpatialDBImpl ( base_db , handles [ 0 ] , index_cf , next_id , read_only ) ;
return Status : : OK ( ) ;
}