@ -11,7 +11,7 @@ use p2p_repo::store::*;
use p2p_repo ::types ::* ;
use p2p_repo ::utils ::* ;
use debug_print ::* ;
use p2p_repo ::log ::* ;
use std ::path ::Path ;
use std ::sync ::{ Arc , RwLock } ;
@ -99,7 +99,7 @@ impl RepoStore for LmdbRepoStore {
Err ( _e ) = > Err ( StorageError ::InvalidValue ) ,
Ok ( o ) = > {
if o . id ( ) ! = * block_id {
debug_println ! (
log_ debug! (
"Invalid ObjectId.\nExp: {:?}\nGot: {:?}\nContent: {:?}" ,
block_id ,
o . id ( ) ,
@ -114,7 +114,7 @@ impl RepoStore for LmdbRepoStore {
}
}
/// Adds a block in the storage backend.
/// Adds a block in the storage backend.
/// The block is persisted to disk.
/// Returns the BlockId of the Block.
fn put ( & self , block : & Block ) -> Result < BlockId , StorageError > {
@ -201,7 +201,6 @@ impl RepoStore for LmdbRepoStore {
writer . commit ( ) . unwrap ( ) ;
Ok ( ( block , slice . len ( ) ) )
}
}
impl LmdbRepoStore {
@ -217,7 +216,7 @@ impl LmdbRepoStore {
. unwrap ( ) ;
let env = shared_rkv . read ( ) . unwrap ( ) ;
println !(
log_debug ! (
"created env with LMDB Version: {} key: {}" ,
env . version ( ) ,
hex ::encode ( & key )
@ -352,7 +351,7 @@ impl LmdbRepoStore {
if ! meta . pin {
// we add an entry to recently_used_store with now
println !( "adding to LRU" ) ;
log_debug ! ( "adding to LRU" ) ;
self . add_to_lru ( & mut writer , & block_id_ser , & now ) . unwrap ( ) ;
}
}
@ -362,7 +361,7 @@ impl LmdbRepoStore {
synced : true ,
last_used : now ,
} ;
println !( "adding to LRU also" ) ;
log_debug ! ( "adding to LRU also" ) ;
self . add_to_lru ( & mut writer , & block_id_ser , & now ) . unwrap ( ) ;
}
}
@ -396,7 +395,7 @@ impl LmdbRepoStore {
while let Some ( Ok ( mut sub_iter ) ) = iter . next ( ) {
while let Some ( Ok ( k ) ) = sub_iter . next ( ) {
//println !("removing {:?} {:?}", k.0, k.1);
//log_debug !("removing {:?} {:?}", k.0, k.1);
let block_id = serde_bare ::from_slice ::< ObjectId > ( k . 1 ) . unwrap ( ) ;
block_ids . push ( block_id ) ;
}
@ -430,7 +429,7 @@ impl LmdbRepoStore {
}
for block_id in block_ids {
let ( block , block_size ) = self . del ( & block_id ) . unwrap ( ) ;
println !( "removed {:?}" , block_id ) ;
log_debug ! ( "removed {:?}" , block_id ) ;
total + = block_size ;
if total > = size {
break ;
@ -468,25 +467,25 @@ impl LmdbRepoStore {
fn list_all ( & self ) {
let lock = self . environment . read ( ) . unwrap ( ) ;
let reader = lock . read ( ) . unwrap ( ) ;
println !( "MAIN" ) ;
log_debug ! ( "MAIN" ) ;
let mut iter = self . main_store . iter_start ( & reader ) . unwrap ( ) ;
while let Some ( Ok ( entry ) ) = iter . next ( ) {
println !( "{:?} {:?}" , entry . 0 , entry . 1 )
log_debug ! ( "{:?} {:?}" , entry . 0 , entry . 1 )
}
println !( "META" ) ;
log_debug ! ( "META" ) ;
let mut iter2 = self . meta_store . iter_start ( & reader ) . unwrap ( ) ;
while let Some ( Ok ( entry ) ) = iter2 . next ( ) {
println !( "{:?} {:?}" , entry . 0 , entry . 1 )
log_debug ! ( "{:?} {:?}" , entry . 0 , entry . 1 )
}
println !( "EXPIRY" ) ;
log_debug ! ( "EXPIRY" ) ;
let mut iter3 = self . expiry_store . iter_start ( & reader ) . unwrap ( ) ;
while let Some ( Ok ( entry ) ) = iter3 . next ( ) {
println !( "{:?} {:?}" , entry . 0 , entry . 1 )
log_debug ! ( "{:?} {:?}" , entry . 0 , entry . 1 )
}
println !( "LRU" ) ;
log_debug ! ( "LRU" ) ;
let mut iter4 = self . recently_used_store . iter_start ( & reader ) . unwrap ( ) ;
while let Some ( Ok ( entry ) ) = iter4 . next ( ) {
println !( "{:?} {:?}" , entry . 0 , entry . 1 )
log_debug ! ( "{:?} {:?}" , entry . 0 , entry . 1 )
}
}
}
@ -494,6 +493,7 @@ impl LmdbRepoStore {
mod test {
use crate ::repo_store ::LmdbRepoStore ;
use p2p_repo ::log ::* ;
use p2p_repo ::store ::* ;
use p2p_repo ::types ::* ;
use p2p_repo ::utils ::* ;
@ -511,7 +511,7 @@ mod test {
let root = Builder ::new ( ) . prefix ( path_str ) . tempdir ( ) . unwrap ( ) ;
let key : [ u8 ; 32 ] = [ 0 ; 32 ] ;
fs ::create_dir_all ( root . path ( ) ) . unwrap ( ) ;
println !( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
log_debug ! ( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
let mut store = LmdbRepoStore ::open ( root . path ( ) , key ) ;
let mut now = now_timestamp ( ) ;
now - = 200 ;
@ -525,14 +525,14 @@ mod test {
None ,
) ;
let block_id = store . put ( & block ) . unwrap ( ) ;
println !( "#{} -> objId {:?}" , x , block_id ) ;
log_debug ! ( "#{} -> objId {:?}" , x , block_id ) ;
store
. has_been_synced ( & block_id , Some ( now + x as u32 ) )
. unwrap ( ) ;
}
let ret = store . remove_least_used ( 200 ) ;
println !( "removed {}" , ret ) ;
log_debug ! ( "removed {}" , ret ) ;
assert_eq! ( ret , 208 )
//store.list_all();
@ -544,7 +544,7 @@ mod test {
let root = Builder ::new ( ) . prefix ( path_str ) . tempdir ( ) . unwrap ( ) ;
let key : [ u8 ; 32 ] = [ 0 ; 32 ] ;
fs ::create_dir_all ( root . path ( ) ) . unwrap ( ) ;
println !( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
log_debug ! ( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
let mut store = LmdbRepoStore ::open ( root . path ( ) , key ) ;
let mut now = now_timestamp ( ) ;
now - = 200 ;
@ -558,7 +558,7 @@ mod test {
None ,
) ;
let obj_id = store . put ( & block ) . unwrap ( ) ;
println !( "#{} -> objId {:?}" , x , obj_id ) ;
log_debug ! ( "#{} -> objId {:?}" , x , obj_id ) ;
store . set_pin ( & obj_id , true ) . unwrap ( ) ;
store
. has_been_synced ( & obj_id , Some ( now + x as u32 ) )
@ -566,7 +566,7 @@ mod test {
}
let ret = store . remove_least_used ( 200 ) ;
println !( "removed {}" , ret ) ;
log_debug ! ( "removed {}" , ret ) ;
assert_eq! ( ret , 0 ) ;
store . list_all ( ) ;
@ -601,7 +601,7 @@ mod test {
let root = Builder ::new ( ) . prefix ( path_str ) . tempdir ( ) . unwrap ( ) ;
let key : [ u8 ; 32 ] = [ 0 ; 32 ] ;
fs ::create_dir_all ( root . path ( ) ) . unwrap ( ) ;
println !( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
log_debug ! ( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
let mut store = LmdbRepoStore ::open ( root . path ( ) , key ) ;
let now = now_timestamp ( ) ;
@ -619,7 +619,7 @@ mod test {
now + 10 ,
] ;
let mut block_ids : Vec < ObjectId > = Vec ::with_capacity ( 11 ) ;
println !( "now {}" , now ) ;
log_debug ! ( "now {}" , now ) ;
let mut i = 0 u8 ;
for expiry in list {
@ -632,7 +632,7 @@ mod test {
None ,
) ;
let block_id = store . put ( & block ) . unwrap ( ) ;
println !( "#{} -> objId {:?}" , i , block_id ) ;
log_debug ! ( "#{} -> objId {:?}" , i , block_id ) ;
block_ids . push ( block_id ) ;
i + = 1 ;
}
@ -655,7 +655,7 @@ mod test {
let root = Builder ::new ( ) . prefix ( path_str ) . tempdir ( ) . unwrap ( ) ;
let key : [ u8 ; 32 ] = [ 0 ; 32 ] ;
fs ::create_dir_all ( root . path ( ) ) . unwrap ( ) ;
println !( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
log_debug ! ( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
let mut store = LmdbRepoStore ::open ( root . path ( ) , key ) ;
let now = now_timestamp ( ) ;
@ -668,7 +668,7 @@ mod test {
now - 2 , //#5 should be removed, and above
] ;
let mut block_ids : Vec < ObjectId > = Vec ::with_capacity ( 6 ) ;
println !( "now {}" , now ) ;
log_debug ! ( "now {}" , now ) ;
let mut i = 0 u8 ;
for expiry in list {
@ -681,7 +681,7 @@ mod test {
None ,
) ;
let block_id = store . put ( & block ) . unwrap ( ) ;
println !( "#{} -> objId {:?}" , i , block_id ) ;
log_debug ! ( "#{} -> objId {:?}" , i , block_id ) ;
block_ids . push ( block_id ) ;
i + = 1 ;
}
@ -702,7 +702,7 @@ mod test {
let root = Builder ::new ( ) . prefix ( path_str ) . tempdir ( ) . unwrap ( ) ;
let key : [ u8 ; 32 ] = [ 0 ; 32 ] ;
fs ::create_dir_all ( root . path ( ) ) . unwrap ( ) ;
println !( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
log_debug ! ( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
let store = LmdbRepoStore ::open ( root . path ( ) , key ) ;
store . remove_expired ( ) . unwrap ( ) ;
}
@ -715,7 +715,7 @@ mod test {
let key : [ u8 ; 32 ] = [ 0 ; 32 ] ;
fs ::create_dir_all ( root . path ( ) ) . unwrap ( ) ;
println !( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
log_debug ! ( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
let mut store = LmdbRepoStore ::open ( root . path ( ) , key ) ;
@ -730,7 +730,7 @@ mod test {
let block_id = store . put ( & block ) . unwrap ( ) ;
assert_eq! ( block_id , block . id ( ) ) ;
println !( "ObjectId: {:?}" , block_id ) ;
log_debug ! ( "ObjectId: {:?}" , block_id ) ;
assert_eq! (
block_id ,
Digest ::Blake3Digest32 ( [
@ -741,7 +741,7 @@ mod test {
let block_res = store . get ( & block_id ) . unwrap ( ) ;
println !( "Block: {:?}" , block_res ) ;
log_debug ! ( "Block: {:?}" , block_res ) ;
assert_eq! ( block_res . id ( ) , block . id ( ) ) ;
}
@ -755,7 +755,7 @@ mod test {
{
fs ::create_dir_all ( root . path ( ) ) . unwrap ( ) ;
println !( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
log_debug ! ( "{}" , root . path ( ) . to_str ( ) . unwrap ( ) ) ;
let mut manager = Manager ::< LmdbEnvironment > ::singleton ( ) . write ( ) . unwrap ( ) ;
let shared_rkv = manager
@ -766,7 +766,7 @@ mod test {
. unwrap ( ) ;
let env = shared_rkv . read ( ) . unwrap ( ) ;
println !( "LMDB Version: {}" , env . version ( ) ) ;
log_debug ! ( "LMDB Version: {}" , env . version ( ) ) ;
let store = env . open_single ( "testdb" , StoreOptions ::create ( ) ) . unwrap ( ) ;
@ -823,12 +823,12 @@ mod test {
let reader = env . read ( ) . expect ( "reader" ) ;
let stat = store . stat ( & reader ) . unwrap ( ) ;
println !( "LMDB stat page_size : {}" , stat . page_size ( ) ) ;
println !( "LMDB stat depth : {}" , stat . depth ( ) ) ;
println !( "LMDB stat branch_pages : {}" , stat . branch_pages ( ) ) ;
println !( "LMDB stat leaf_pages : {}" , stat . leaf_pages ( ) ) ;
println !( "LMDB stat overflow_pages : {}" , stat . overflow_pages ( ) ) ;
println !( "LMDB stat entries : {}" , stat . entries ( ) ) ;
log_debug ! ( "LMDB stat page_size : {}" , stat . page_size ( ) ) ;
log_debug ! ( "LMDB stat depth : {}" , stat . depth ( ) ) ;
log_debug ! ( "LMDB stat branch_pages : {}" , stat . branch_pages ( ) ) ;
log_debug ! ( "LMDB stat leaf_pages : {}" , stat . leaf_pages ( ) ) ;
log_debug ! ( "LMDB stat overflow_pages : {}" , stat . overflow_pages ( ) ) ;
log_debug ! ( "LMDB stat entries : {}" , stat . entries ( ) ) ;
}
// {
@ -838,17 +838,17 @@ mod test {
// let reader = env.read().expect("reader");
// // Keys are `AsRef<u8>`, and the return value is `Result<Option<Value>, StoreError>`.
// // println !("Get int {:?}", store.get(&reader, "int").unwrap());
// // println !("Get uint {:?}", store.get(&reader, "uint").unwrap());
// // println !("Get float {:?}", store.get(&reader, "float").unwrap());
// // println !("Get instant {:?}", store.get(&reader, "instant").unwrap());
// // println !("Get boolean {:?}", store.get(&reader, "boolean").unwrap());
// // println !("Get string {:?}", store.get(&reader, "string").unwrap());
// // println !("Get json {:?}", store.get(&reader, "json").unwrap());
// println !("Get blob {:?}", store.get(&reader, "blob").unwrap());
// // log_debug !("Get int {:?}", store.get(&reader, "int").unwrap());
// // log_debug !("Get uint {:?}", store.get(&reader, "uint").unwrap());
// // log_debug !("Get float {:?}", store.get(&reader, "float").unwrap());
// // log_debug !("Get instant {:?}", store.get(&reader, "instant").unwrap());
// // log_debug !("Get boolean {:?}", store.get(&reader, "boolean").unwrap());
// // log_debug !("Get string {:?}", store.get(&reader, "string").unwrap());
// // log_debug !("Get json {:?}", store.get(&reader, "json").unwrap());
// log_debug !("Get blob {:?}", store.get(&reader, "blob").unwrap());
// // Retrieving a non-existent value returns `Ok(None)`.
// println !(
// log_debug !(
// "Get non-existent value {:?}",
// store.get(&reader, "non-existent").unwrap()
// );
@ -864,7 +864,7 @@ mod test {
// store.put(&mut writer, "foo", &Value::Blob(b"bar")).unwrap();
// writer.abort();
// let reader = env.read().expect("reader");
// println !(
// log_debug !(
// "It should be None! ({:?})",
// store.get(&reader, "foo").unwrap()
// );
@ -879,7 +879,7 @@ mod test {
// store.put(&mut writer, "foo", &Value::Blob(b"bar")).unwrap();
// }
// let reader = env.read().expect("reader");
// println !(
// log_debug !(
// "It should be None! ({:?})",
// store.get(&reader, "foo").unwrap()
// );
@ -898,26 +898,26 @@ mod test {
// // In the code above, "foo" and "bar" were put into the store, then "foo" was
// // deleted so only "bar" will return a result when the database is queried via the
// // writer.
// println !(
// log_debug !(
// "It should be None! ({:?})",
// store.get(&writer, "foo").unwrap()
// );
// println !("Get bar ({:?})", store.get(&writer, "bar").unwrap());
// log_debug !("Get bar ({:?})", store.get(&writer, "bar").unwrap());
// // But a reader won't see that change until the write transaction is committed.
// {
// let reader = env.read().expect("reader");
// println !("Get foo {:?}", store.get(&reader, "foo").unwrap());
// println !("Get bar {:?}", store.get(&reader, "bar").unwrap());
// log_debug !("Get foo {:?}", store.get(&reader, "foo").unwrap());
// log_debug !("Get bar {:?}", store.get(&reader, "bar").unwrap());
// }
// writer.commit().unwrap();
// {
// let reader = env.read().expect("reader");
// println !(
// log_debug !(
// "It should be None! ({:?})",
// store.get(&reader, "foo").unwrap()
// );
// println !("Get bar {:?}", store.get(&reader, "bar").unwrap());
// log_debug !("Get bar {:?}", store.get(&reader, "bar").unwrap());
// }
// // Committing a transaction consumes the writer, preventing you from reusing it by
@ -943,11 +943,11 @@ mod test {
// // {
// // let reader = env.read().expect("reader");
// // println !(
// // log_debug !(
// // "It should be None! ({:?})",
// // store.get(&reader, "foo").unwrap()
// // );
// // println !(
// // log_debug !(
// // "It should be None! ({:?})",
// // store.get(&reader, "bar").unwrap()
// // );
@ -956,17 +956,17 @@ mod test {
let stat = env . stat ( ) . unwrap ( ) ;
let info = env . info ( ) . unwrap ( ) ;
println !( "LMDB info map_size : {}" , info . map_size ( ) ) ;
println !( "LMDB info last_pgno : {}" , info . last_pgno ( ) ) ;
println !( "LMDB info last_txnid : {}" , info . last_txnid ( ) ) ;
println !( "LMDB info max_readers : {}" , info . max_readers ( ) ) ;
println !( "LMDB info num_readers : {}" , info . num_readers ( ) ) ;
println !( "LMDB stat page_size : {}" , stat . page_size ( ) ) ;
println !( "LMDB stat depth : {}" , stat . depth ( ) ) ;
println !( "LMDB stat branch_pages : {}" , stat . branch_pages ( ) ) ;
println !( "LMDB stat leaf_pages : {}" , stat . leaf_pages ( ) ) ;
println !( "LMDB stat overflow_pages : {}" , stat . overflow_pages ( ) ) ;
println !( "LMDB stat entries : {}" , stat . entries ( ) ) ;
log_debug ! ( "LMDB info map_size : {}" , info . map_size ( ) ) ;
log_debug ! ( "LMDB info last_pgno : {}" , info . last_pgno ( ) ) ;
log_debug ! ( "LMDB info last_txnid : {}" , info . last_txnid ( ) ) ;
log_debug ! ( "LMDB info max_readers : {}" , info . max_readers ( ) ) ;
log_debug ! ( "LMDB info num_readers : {}" , info . num_readers ( ) ) ;
log_debug ! ( "LMDB stat page_size : {}" , stat . page_size ( ) ) ;
log_debug ! ( "LMDB stat depth : {}" , stat . depth ( ) ) ;
log_debug ! ( "LMDB stat branch_pages : {}" , stat . branch_pages ( ) ) ;
log_debug ! ( "LMDB stat leaf_pages : {}" , stat . leaf_pages ( ) ) ;
log_debug ! ( "LMDB stat overflow_pages : {}" , stat . overflow_pages ( ) ) ;
log_debug ! ( "LMDB stat entries : {}" , stat . entries ( ) ) ;
}
// We reopen the env and data to see if it was well saved to disk.
{
@ -979,13 +979,13 @@ mod test {
. unwrap ( ) ;
let env = shared_rkv . read ( ) . unwrap ( ) ;
println !( "LMDB Version: {}" , env . version ( ) ) ;
log_debug ! ( "LMDB Version: {}" , env . version ( ) ) ;
let mut store = env . open_single ( "testdb" , StoreOptions ::default ( ) ) . unwrap ( ) ; //StoreOptions::create()
{
let reader = env . read ( ) . expect ( "reader" ) ;
println !(
log_debug ! (
"It should be baz! ({:?})" ,
store . get ( & reader , "bar" ) . unwrap ( )
) ;