switching to rocksdb as storage backend

master
Niko PLP 9 months ago
parent 92eabc251e
commit 54722e1f76
  1. 155
      Cargo.lock
  2. 1
      Cargo.toml
  3. 2
      ng-app/src/App.svelte
  4. 8
      ng-app/src/lib/Home.svelte
  5. 198
      ng-app/src/lib/Install.svelte
  6. 3
      ng-app/src/routes/Home.svelte
  7. 205
      ng-app/src/routes/Install.svelte
  8. 1
      ngone/.gitignore
  9. 2
      ngone/Cargo.toml
  10. BIN
      ngone/data/lock.mdb
  11. 10
      ngone/src/main.rs
  12. 2
      ngone/web/src/App.svelte
  13. 28
      ngone/web/src/routes/Install.svelte
  14. 83
      p2p-broker/src/broker_store/account.rs
  15. 2
      p2p-broker/src/broker_store/invitation.rs
  16. 6
      p2p-repo/src/kcv_store.rs
  17. 1
      p2p-repo/src/store.rs
  18. 12
      stores-lmdb/src/kcv_store.rs
  19. 20
      stores-rocksdb/Cargo.toml
  20. 424
      stores-rocksdb/src/kcv_store.rs
  21. 5
      stores-rocksdb/src/lib.rs
  22. 992
      stores-rocksdb/src/repo_store.rs

155
Cargo.lock generated

@ -477,6 +477,27 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.65.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
dependencies = [
"bitflags",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"peeking_take_while",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.18",
]
[[package]]
name = "bit_field"
version = "0.10.2"
@ -621,6 +642,17 @@ dependencies = [
"serde",
]
[[package]]
name = "bzip2-sys"
version = "0.1.11+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "cairo-rs"
version = "0.16.7"
@ -661,6 +693,9 @@ name = "cc"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
dependencies = [
"jobserver",
]
[[package]]
name = "cesu8"
@ -668,6 +703,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfb"
version = "0.7.3"
@ -746,6 +790,17 @@ dependencies = [
"zeroize",
]
[[package]]
name = "clang-sys"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c688fc74432808e3eb684cae8830a86be1d66a2bd58e1f248ed0960a590baf6f"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "clap"
version = "4.3.5"
@ -2370,6 +2425,15 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
[[package]]
name = "jobserver"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2"
dependencies = [
"libc",
]
[[package]]
name = "jpeg-decoder"
version = "0.3.0"
@ -2427,6 +2491,12 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lebe"
version = "0.5.2"
@ -2439,12 +2509,46 @@ version = "0.2.146"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
[[package]]
name = "libloading"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f"
dependencies = [
"cfg-if",
"winapi",
]
[[package]]
name = "libm"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a"
[[package]]
name = "librocksdb-sys"
version = "0.11.0+8.3.2"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#520e09b1ada58e90075d0d462876afd3b3f9f6a9"
dependencies = [
"bindgen",
"bzip2-sys",
"cc",
"glob",
"libc",
"libz-sys",
]
[[package]]
name = "libz-sys"
version = "1.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "line-wrap"
version = "0.1.1"
@ -2912,7 +3016,7 @@ dependencies = [
"serde_bytes",
"serde_json",
"slice_as_array",
"stores-lmdb",
"stores-rocksdb",
"tokio",
"warp",
"warp-embed",
@ -3339,6 +3443,12 @@ version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79"
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "percent-encoding"
version = "2.3.0"
@ -3603,6 +3713,16 @@ dependencies = [
"termtree",
]
[[package]]
name = "prettyplease"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9825a04601d60621feed79c4e6b56d65db77cdca55cef43b46b0de1096d1c282"
dependencies = [
"proc-macro2",
"syn 2.0.18",
]
[[package]]
name = "proc-macro-crate"
version = "1.3.1"
@ -3925,6 +4045,15 @@ dependencies = [
"uuid 0.8.2",
]
[[package]]
name = "rocksdb"
version = "0.21.0"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#520e09b1ada58e90075d0d462876afd3b3f9f6a9"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]]
name = "rust-embed"
version = "6.7.0"
@ -3970,6 +4099,12 @@ dependencies = [
"num-traits",
]
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
@ -4328,6 +4463,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "signal-hook"
version = "0.3.15"
@ -4460,6 +4601,18 @@ dependencies = [
"tempfile",
]
[[package]]
name = "stores-rocksdb"
version = "0.1.0"
dependencies = [
"hex",
"p2p-repo",
"rocksdb",
"serde",
"serde_bare",
"tempfile",
]
[[package]]
name = "string_cache"
version = "0.8.7"

@ -6,6 +6,7 @@ members = [
"p2p-client-ws",
"p2p-verifier",
"stores-lmdb",
"stores-rocksdb",
"ngcli",
"ngd",
"ngone",

@ -29,7 +29,7 @@
import WalletCreate from "./routes/WalletCreate.svelte";
import WalletLogin from "./routes/WalletLogin.svelte";
import UserRegistered from "./routes/UserRegistered.svelte";
import Install from "./routes/Install.svelte";
import Install from "./lib/Install.svelte";
import ng from "./api";

@ -14,14 +14,12 @@
import { link } from "svelte-spa-router";
// @ts-ignore
import Logo from "../assets/nextgraph.svg?component";
import Test from "./Test.svelte";
import { has_wallets, active_wallet } from "../store";
import { onMount } from "svelte";
export let display_login_create = false;
</script>
{#if !$has_wallets || !$active_wallet || display_login_create}
{#if display_login_create}
<main class="container3">
<div class="row">
<Logo class="logo block h-40" alt="NextGraph Logo" />
@ -86,8 +84,6 @@
{:else}
<main class="container3">
<h1>Welcome to test</h1>
<div class="row">
<Test />
</div>
<div class="row" />
</main>
{/if}

File diff suppressed because one or more lines are too long

@ -24,6 +24,7 @@
derived,
} from "../store";
let display_login_create = !$has_wallets || !$active_wallet;
let unsubscribe;
onMount(() => {
const combined = derived([active_wallet, has_wallets], ([$s1, $s2]) => [
@ -43,4 +44,4 @@
});
</script>
<Home />
<Home {display_login_create} />

File diff suppressed because one or more lines are too long

1
ngone/.gitignore vendored

@ -0,0 +1 @@
data

@ -14,7 +14,7 @@ warp-embed = "0.4"
rust-embed = "6"
log = "0.4"
env_logger = "0.10"
stores-lmdb = { path = "../stores-lmdb" }
stores-rocksdb = { path = "../stores-rocksdb" }
p2p-repo = { path = "../p2p-repo", features = ["server_log_output"] }
p2p-net = { path = "../p2p-net" }
ng-wallet = { path = "../ng-wallet" }

Binary file not shown.

@ -29,14 +29,14 @@ use p2p_net::types::{APP_NG_ONE_URL, NG_ONE_URL};
use p2p_repo::log::*;
use p2p_repo::types::*;
use p2p_repo::utils::{generate_keypair, sign, verify};
use stores_lmdb::kcv_store::LmdbKCVStore;
use stores_rocksdb::kcv_store::RocksdbKCVStore;
#[derive(RustEmbed)]
#[folder = "web/dist"]
struct Static;
struct Server {
store: LmdbKCVStore,
store: RocksdbKCVStore,
}
impl Server {
@ -158,7 +158,7 @@ async fn main() {
let key: [u8; 32] = [0; 32];
log_debug!("data directory: {}", dir.to_str().unwrap());
fs::create_dir_all(dir.clone()).unwrap();
let store = LmdbKCVStore::open(&dir, key);
let store = RocksdbKCVStore::open(&dir, key);
if store.is_err() {
return;
}
@ -217,8 +217,8 @@ async fn main() {
log_debug!("CORS: any origin");
cors = cors.allow_any_origin();
}
log::info!("Starting server on http://localhost:3030");
log::info!("Starting server on http://localhost:3032");
warp::serve(api_v1.or(static_files).with(cors))
.run(([127, 0, 0, 1], 3030))
.run(([127, 0, 0, 1], 3032))
.await;
}

@ -14,7 +14,7 @@
import Home from "./routes/Home.svelte";
import WalletCreate from "./routes/WalletCreate.svelte";
import Install from "../../../ng-app/src/routes/Install.svelte";
import Install from "./routes/Install.svelte";
//import URI from "./routes/URI.svelte";
import NotFound from "./routes/NotFound.svelte";

@ -0,0 +1,28 @@
<!--
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
-->
<script type="ts">
import { Button } from "flowbite-svelte";
import { link } from "svelte-spa-router";
import Install from "../../../../ng-app/src/lib/Install.svelte";
import { push } from "svelte-spa-router";
import { onMount, onDestroy } from "svelte";
let display_has_wallets_warning = false;
let unsubscribe;
onMount(() => {});
onDestroy(() => {
unsubscribe();
});
</script>
<Install {display_has_wallets_warning} />

@ -27,24 +27,16 @@ pub struct Account<'a> {
}
impl<'a> Account<'a> {
const PREFIX: u8 = b"u"[0];
const PREFIX_CLIENT: u8 = b"d"[0];
// propertie's suffixes
const CLIENT: u8 = b"c"[0];
const ADMIN: u8 = b"a"[0];
//const OVERLAY: u8 = b"o"[0];
const PREFIX_ACCOUNT: u8 = b"a"[0];
const PREFIX_CLIENT: u8 = b"c"[0];
const PREFIX_CLIENT_PROPERTY: u8 = b"d"[0];
// propertie's client suffixes
const INFO: u8 = b"i"[0];
const LAST_SEEN: u8 = b"l"[0];
const ALL_PROPERTIES: [u8; 2] = [Self::CLIENT, Self::ADMIN];
const ALL_CLIENT_PROPERTIES: [u8; 2] = [Self::INFO, Self::LAST_SEEN];
const SUFFIX_FOR_EXIST_CHECK: u8 = Self::ADMIN;
pub fn open(id: &UserId, store: &'a dyn KCVStore) -> Result<Account<'a>, StorageError> {
let opening = Account {
id: id.clone(),
@ -67,12 +59,7 @@ impl<'a> Account<'a> {
if acc.exists() {
return Err(StorageError::AlreadyExists);
}
store.put(
Self::PREFIX,
&to_vec(&id)?,
Some(Self::ADMIN),
to_vec(&admin)?,
)?;
store.put(Self::PREFIX_ACCOUNT, &to_vec(&id)?, None, to_vec(&admin)?)?;
Ok(acc)
}
pub fn get_all_users(
@ -81,7 +68,7 @@ impl<'a> Account<'a> {
) -> Result<Vec<UserId>, StorageError> {
let size = to_vec(&UserId::nil())?.len();
let mut res: Vec<UserId> = vec![];
for user in store.get_all_keys_and_values(Self::PREFIX, size, Some(Self::ADMIN))? {
for user in store.get_all_keys_and_values(Self::PREFIX_ACCOUNT, size, vec![], None)? {
let admin: bool = from_slice(&user.1)?;
if admin == admins {
let id: UserId = from_slice(&user.0[1..user.0.len() - 1])?;
@ -92,11 +79,7 @@ impl<'a> Account<'a> {
}
pub fn exists(&self) -> bool {
self.store
.get(
Self::PREFIX,
&to_vec(&self.id).unwrap(),
Some(Self::SUFFIX_FOR_EXIST_CHECK),
)
.get(Self::PREFIX_ACCOUNT, &to_vec(&self.id).unwrap(), None)
.is_ok()
}
pub fn id(&self) -> UserId {
@ -112,39 +95,31 @@ impl<'a> Account<'a> {
let hash = s.finish();
let client_key = (client.clone(), hash);
let client_key_ser = to_vec(&client_key)?;
let mut client_key_ser = to_vec(&client_key)?;
let info_ser = to_vec(info)?;
self.store.write_transaction(&mut |tx| {
let mut id_and_client = to_vec(&self.id)?;
id_and_client.append(&mut client_key_ser);
if tx
.has_property_value(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::CLIENT),
&client_key_ser,
)
.has_property_value(Self::PREFIX_CLIENT, &id_and_client, None, &vec![])
.is_err()
{
tx.put(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::CLIENT),
&client_key_ser,
)?;
tx.put(Self::PREFIX_CLIENT, &id_and_client, None, &vec![])?;
}
if tx
.has_property_value(
Self::PREFIX_CLIENT,
&client_key_ser,
Self::PREFIX_CLIENT_PROPERTY,
&id_and_client,
Some(Self::INFO),
&info_ser,
)
.is_err()
{
tx.put(
Self::PREFIX_CLIENT,
&client_key_ser,
Self::PREFIX_CLIENT_PROPERTY,
&id_and_client,
Some(Self::INFO),
&info_ser,
)?;
@ -154,8 +129,8 @@ impl<'a> Account<'a> {
.unwrap()
.as_secs();
tx.replace(
Self::PREFIX_CLIENT,
&client_key_ser,
Self::PREFIX_CLIENT_PROPERTY,
&id_and_client,
Some(Self::LAST_SEEN),
&to_vec(&now)?,
)?;
@ -205,9 +180,9 @@ impl<'a> Account<'a> {
if self
.store
.has_property_value(
Self::PREFIX,
Self::PREFIX_ACCOUNT,
&to_vec(&self.id)?,
Some(Self::ADMIN),
None,
&to_vec(&true)?,
)
.is_ok()
@ -219,12 +194,26 @@ impl<'a> Account<'a> {
pub fn del(&self) -> Result<(), StorageError> {
self.store.write_transaction(&mut |tx| {
if let Ok(clients) = tx.get_all(Self::PREFIX, &to_vec(&self.id)?, Some(Self::CLIENT)) {
let id = to_vec(&self.id)?;
// let mut id_and_client = to_vec(&self.id)?;
// let client_key = (client.clone(), hash);
// let mut client_key_ser = to_vec(&client_key)?;
let client_key = (ClientId::nil(), 0u64);
let mut client_key_ser = to_vec(&client_key)?;
let size = client_key_ser.len() + id.len();
if let Ok(clients) = tx.get_all_keys_and_values(Self::PREFIX_CLIENT, size, id, None) {
for client in clients {
tx.del_all(Self::PREFIX_CLIENT, &client, &Self::ALL_CLIENT_PROPERTIES)?;
tx.del(Self::PREFIX_CLIENT, &client.0, None)?;
tx.del_all(
Self::PREFIX_CLIENT_PROPERTY,
&client.0,
&Self::ALL_CLIENT_PROPERTIES,
)?;
}
}
tx.del_all(Self::PREFIX, &to_vec(&self.id)?, &Self::ALL_PROPERTIES)?;
tx.del(Self::PREFIX_ACCOUNT, &to_vec(&self.id)?, None)?;
Ok(())
})
}

@ -94,7 +94,7 @@ impl<'a> Invitation<'a> {
unique = true;
multi = true;
}
for invite in store.get_all_keys_and_values(Self::PREFIX, size, None)? {
for invite in store.get_all_keys_and_values(Self::PREFIX, size, vec![], None)? {
if invite.0.len() == size + 2 {
let code: [u8; 32] = from_slice(&invite.0[1..invite.0.len() - 1])?;
if invite.0[size + 1] == Self::TYPE {

@ -8,6 +8,8 @@
use crate::store::StorageError;
// TODO:remove mut on self for trait WriteTransaction methods
pub trait WriteTransaction: ReadTransaction {
/// Save a property value to the store.
fn put(
@ -53,6 +55,9 @@ pub trait ReadTransaction {
fn get(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<Vec<u8>, StorageError>;
/// Load all the values of a property from the store.
#[deprecated(
note = "KVStore has unique values (since switch from lmdb to rocksdb) use get() instead"
)]
fn get_all(
&self,
prefix: u8,
@ -74,6 +79,7 @@ pub trait ReadTransaction {
&self,
prefix: u8,
key_size: usize,
key_prefix: Vec<u8>,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError>;
}

@ -38,6 +38,7 @@ pub trait RepoStore {
pub enum StorageError {
NotFound,
InvalidValue,
DifferentValue,
BackendError,
SerializationError,
AlreadyExists,

@ -47,6 +47,7 @@ impl<'a> ReadTransaction for LmdbTransaction<'a> {
&self,
prefix: u8,
key_size: usize,
key_prefix: Vec<u8>,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> {
unimplemented!();
@ -253,10 +254,17 @@ impl ReadTransaction for LmdbKCVStore {
&self,
prefix: u8,
key_size: usize,
key_prefix: Vec<u8>,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> {
let vec_key_start = vec![0u8; key_size];
let vec_key_end = vec![255u8; key_size];
let mut vec_key_start = key_prefix.clone();
let mut trailing_zeros = vec![0u8; key_size - key_prefix.len()];
vec_key_start.append(&mut trailing_zeros);
let mut vec_key_end = key_prefix.clone();
let mut trailing_max = vec![255u8; key_size - key_prefix.len()];
vec_key_end.append(&mut trailing_max);
let property_start = Self::compute_property(prefix, &vec_key_start, suffix);
let property_end =
Self::compute_property(prefix, &vec_key_end, Some(suffix.unwrap_or(255u8)));

@ -0,0 +1,20 @@
[package]
name = "stores-rocksdb"
version = "0.1.0"
edition = "2021"
license = "MIT/Apache-2.0"
authors = ["Niko PLP <niko@nextgraph.org>"]
description = "P2P stores based on LMDB for NextGraph"
repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs"
[dependencies]
p2p-repo = { path = "../p2p-repo" }
serde = { version = "1.0.142", features = ["derive"] }
serde_bare = "0.5.0"
tempfile = "3"
hex = "0.4.3"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.rocksdb]
git = "https://git.nextgraph.org/NextGraph/rust-rocksdb.git"
branch = "master"
features = [ ]

@ -0,0 +1,424 @@
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use p2p_repo::kcv_store::*;
use p2p_repo::store::*;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use p2p_repo::log::*;
use std::path::Path;
use std::path::PathBuf;
use std::sync::RwLockReadGuard;
use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use serde_bare::error::Error;
use rocksdb::{
ColumnFamilyDescriptor, Direction, Env, ErrorKind, IteratorMode, Options, SingleThreaded,
TransactionDB, TransactionDBOptions, DB,
};
pub struct RocksdbTransaction<'a> {
store: &'a RocksdbKCVStore,
tx: Option<rocksdb::Transaction<'a, TransactionDB>>,
}
impl<'a> RocksdbTransaction<'a> {
fn commit(&mut self) {
self.tx.take().unwrap().commit().unwrap();
}
fn tx(&self) -> &rocksdb::Transaction<'a, TransactionDB> {
self.tx.as_ref().unwrap()
}
}
impl<'a> ReadTransaction for RocksdbTransaction<'a> {
fn get_all_keys_and_values(
&self,
prefix: u8,
key_size: usize,
key_prefix: Vec<u8>,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> {
self.store
.get_all_keys_and_values(prefix, key_size, key_prefix, suffix)
}
/// Load a single value property from the store.
fn get(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<Vec<u8>, StorageError> {
let property = RocksdbKCVStore::compute_property(prefix, key, suffix);
let mut res = self
.tx()
.get_for_update(property, true)
.map_err(|e| StorageError::BackendError)?;
match res {
Some(val) => Ok(val),
None => Err(StorageError::NotFound),
}
}
/// Load all the values of a property from the store.
fn get_all(
&self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
) -> Result<Vec<Vec<u8>>, StorageError> {
unimplemented!();
}
/// Check if a specific value exists for a property from the store.
fn has_property_value(
&self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: &Vec<u8>,
) -> Result<(), StorageError> {
let property = RocksdbKCVStore::compute_property(prefix, key, suffix);
let exists = self
.tx()
.get_for_update(property, true)
.map_err(|e| StorageError::BackendError)?;
match exists {
Some(stored_value) => {
if stored_value.eq(value) {
Ok(())
} else {
Err(StorageError::DifferentValue)
}
}
None => Err(StorageError::NotFound),
}
}
}
impl<'a> WriteTransaction for RocksdbTransaction<'a> {
/// Save a property value to the store.
fn put(
&mut self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: &Vec<u8>,
) -> Result<(), StorageError> {
let property = RocksdbKCVStore::compute_property(prefix, key, suffix);
self.tx()
.put(property, value)
.map_err(|e| StorageError::BackendError)?;
Ok(())
}
/// Replace the property of a key (single value) to the store.
fn replace(
&mut self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: &Vec<u8>,
) -> Result<(), StorageError> {
let property = RocksdbKCVStore::compute_property(prefix, key, suffix);
self.tx()
.put(property, value)
.map_err(|e| StorageError::BackendError)?;
Ok(())
}
/// Delete a property from the store.
fn del(&mut self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<(), StorageError> {
let property = RocksdbKCVStore::compute_property(prefix, key, suffix);
let res = self.tx().delete(property);
if res.is_err() {
if let ErrorKind::NotFound = res.unwrap_err().kind() {
return Ok(());
}
return Err(StorageError::BackendError);
}
Ok(())
}
/// Delete a specific value for a property from the store.
fn del_property_value(
&mut self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: &Vec<u8>,
) -> Result<(), StorageError> {
let property = RocksdbKCVStore::compute_property(prefix, key, suffix);
let exists = self
.tx()
.get_for_update(property.clone(), true)
.map_err(|e| StorageError::BackendError)?;
match exists {
Some(val) => {
if val.eq(value) {
self.tx()
.delete(property)
.map_err(|e| StorageError::BackendError)?;
}
}
None => return Err(StorageError::DifferentValue),
}
Ok(())
}
/// Delete all properties of a key from the store.
fn del_all(
&mut self,
prefix: u8,
key: &Vec<u8>,
all_suffixes: &[u8],
) -> Result<(), StorageError> {
for suffix in all_suffixes {
self.del(prefix, key, Some(*suffix))?;
}
if all_suffixes.is_empty() {
self.del(prefix, key, None)?;
}
Ok(())
}
}
pub struct RocksdbKCVStore {
/// the main store where all the properties of keys are stored
main_db: TransactionDB,
/// path for the storage backend data
path: String,
}
fn compare<T: Ord>(a: &[T], b: &[T]) -> std::cmp::Ordering {
let mut iter_b = b.iter();
for v in a {
match iter_b.next() {
Some(w) => match v.cmp(w) {
std::cmp::Ordering::Equal => continue,
ord => return ord,
},
None => break,
}
}
return a.len().cmp(&b.len());
}
impl ReadTransaction for RocksdbKCVStore {
fn get_all_keys_and_values(
&self,
prefix: u8,
key_size: usize,
key_prefix: Vec<u8>,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> {
if key_prefix.len() > key_size {
return Err(StorageError::InvalidValue);
}
let mut vec_key_start = key_prefix.clone();
let mut trailing_zeros = vec![0u8; key_size - key_prefix.len()];
vec_key_start.append(&mut trailing_zeros);
let mut vec_key_end = key_prefix.clone();
let mut trailing_max = vec![255u8; key_size - key_prefix.len()];
vec_key_end.append(&mut trailing_max);
let property_start = Self::compute_property(prefix, &vec_key_start, suffix);
let property_end =
Self::compute_property(prefix, &vec_key_end, Some(suffix.unwrap_or(255u8)));
let mut iter = self
.main_db
.iterator(IteratorMode::From(&property_start, Direction::Forward));
let mut vector: Vec<(Vec<u8>, Vec<u8>)> = vec![];
while let res = iter.next() {
match res {
Some(Ok(val)) => {
match compare(&val.0, property_end.as_slice()) {
std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {
if suffix.is_some() {
if val.0.len() < (key_size + 2)
|| val.0[1 + key_size] != suffix.unwrap()
{
continue;
}
// } else if val.0.len() > (key_size + 1) {
// continue;
}
vector.push((val.0.to_vec(), val.1.to_vec()));
}
_ => {} //,
}
}
Some(Err(_e)) => return Err(StorageError::BackendError),
None => {
break;
}
}
}
Ok(vector)
}
/// Load a single value property from the store.
fn get(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<Vec<u8>, StorageError> {
let property = Self::compute_property(prefix, key, suffix);
let mut res = self
.main_db
.get(property)
.map_err(|e| StorageError::BackendError)?;
match res {
Some(val) => Ok(val),
None => Err(StorageError::NotFound),
}
}
/// Load all the values of a property from the store.
fn get_all(
&self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
) -> Result<Vec<Vec<u8>>, StorageError> {
unimplemented!();
}
/// Check if a specific value exists for a property from the store.
fn has_property_value(
&self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: &Vec<u8>,
) -> Result<(), StorageError> {
let property = Self::compute_property(prefix, key, suffix);
let exists = self
.main_db
.get(property)
.map_err(|e| StorageError::BackendError)?;
match exists {
Some(stored_value) => {
if stored_value.eq(value) {
Ok(())
} else {
Err(StorageError::DifferentValue)
}
}
None => Err(StorageError::NotFound),
}
}
}
impl KCVStore for RocksdbKCVStore {
fn write_transaction(
&self,
method: &mut dyn FnMut(&mut dyn WriteTransaction) -> Result<(), StorageError>,
) -> Result<(), StorageError> {
let tx = self.main_db.transaction();
let mut transaction = RocksdbTransaction {
store: self,
tx: Some(tx),
};
let res = method(&mut transaction);
if res.is_ok() {
transaction.commit();
//lock.sync(true);
}
res
}
/// Save a property value to the store.
fn put(
&self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: Vec<u8>,
) -> Result<(), StorageError> {
self.write_transaction(&mut |tx| tx.put(prefix, key, suffix, &value))
}
/// Replace the property of a key (single value) to the store.
fn replace(
&self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: Vec<u8>,
) -> Result<(), StorageError> {
self.write_transaction(&mut |tx| tx.replace(prefix, key, suffix, &value))
}
/// Delete a property from the store.
fn del(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<(), StorageError> {
self.write_transaction(&mut |tx| tx.del(prefix, key, suffix))
}
/// Delete a specific value for a property from the store.
fn del_property_value(
&self,
prefix: u8,
key: &Vec<u8>,
suffix: Option<u8>,
value: Vec<u8>,
) -> Result<(), StorageError> {
self.write_transaction(&mut |tx| tx.del_property_value(prefix, key, suffix, &value))
}
/// Delete all properties of a key from the store.
fn del_all(&self, prefix: u8, key: &Vec<u8>, all_suffixes: &[u8]) -> Result<(), StorageError> {
for suffix in all_suffixes {
self.del(prefix, key, Some(*suffix))?;
}
if all_suffixes.is_empty() {
self.del(prefix, key, None)?;
}
Ok(())
}
}
impl RocksdbKCVStore {
pub fn path(&self) -> PathBuf {
PathBuf::from(&self.path)
}
fn compute_property(prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Vec<u8> {
let mut new: Vec<u8> = Vec::with_capacity(key.len() + 2);
new.push(prefix);
new.extend(key);
if suffix.is_some() {
new.push(suffix.unwrap())
}
new
}
/// Opens the store and returns a KCVStore object that should be kept and used to manipulate the properties
/// The key is the encryption key for the data at rest.
pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result<RocksdbKCVStore, StorageError> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let env = Env::enc_env(key).unwrap();
opts.set_env(&env);
let tx_options = TransactionDBOptions::new();
let db: TransactionDB =
TransactionDB::open_cf(&opts, &tx_options, &path, vec!["cf0", "cf1"]).unwrap();
log_info!("created db with Rocksdb Version: {}", Env::version());
Ok(RocksdbKCVStore {
main_db: db,
path: path.to_str().unwrap().to_string(),
})
}
}

@ -0,0 +1,5 @@
// #[cfg(not(target_arch = "wasm32"))]
// pub mod repo_store;
#[cfg(not(target_arch = "wasm32"))]
pub mod kcv_store;

@ -0,0 +1,992 @@
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use p2p_repo::store::*;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use p2p_repo::log::*;
use std::path::Path;
use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use serde_bare::error::Error;
#[derive(Debug)]
pub struct LmdbRepoStore {
/// the main store where all the repo blocks are stored
main_store: SingleStore<LmdbDatabase>,
/// store for the pin boolean, recently_used timestamp, and synced boolean
meta_store: SingleStore<LmdbDatabase>,
/// store for the expiry timestamp
expiry_store: MultiIntegerStore<LmdbDatabase, u32>,
/// store for the LRU list
recently_used_store: MultiIntegerStore<LmdbDatabase, u32>,
/// the opened environment so we can create new transactions
environment: Arc<RwLock<Rkv<LmdbEnvironment>>>,
}
// TODO: versioning V0
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
struct BlockMeta {
pub pin: bool,
pub last_used: Timestamp,
pub synced: bool,
}
impl RepoStore for LmdbRepoStore {
/// Retrieves a block from the storage backend.
fn get(&self, block_id: &BlockId) -> Result<Block, StorageError> {
let lock = self.environment.read().unwrap();
let reader = lock.read().unwrap();
let block_id_ser = serde_bare::to_vec(&block_id).unwrap();
let block_ser_res = self.main_store.get(&reader, block_id_ser.clone());
match block_ser_res {
Err(e) => Err(StorageError::BackendError),
Ok(None) => Err(StorageError::NotFound),
Ok(Some(block_ser)) => {
// updating recently_used
// first getting the meta for this BlockId
let meta_ser = self.meta_store.get(&reader, block_id_ser.clone()).unwrap();
match meta_ser {
Some(meta_value) => {
let mut meta =
serde_bare::from_slice::<BlockMeta>(&meta_value.to_bytes().unwrap())
.unwrap();
if meta.synced {
let mut writer = lock.write().unwrap();
let now = now_timestamp();
if !meta.pin {
// we remove the previous timestamp (last_used) from recently_used_store
self.remove_from_lru(&mut writer, &block_id_ser, &meta.last_used)
.unwrap();
// we add an entry to recently_used_store with now
self.add_to_lru(&mut writer, &block_id_ser, &now).unwrap();
}
// we save the new meta (with last_used:now)
meta.last_used = now;
let new_meta_ser = serde_bare::to_vec(&meta).unwrap();
self.meta_store
.put(
&mut writer,
block_id_ser,
&Value::Blob(new_meta_ser.as_slice()),
)
.unwrap();
// commit
writer.commit().unwrap();
}
}
_ => {} // there is no meta. we do nothing since we start to record LRU only once synced == true.
}
match serde_bare::from_slice::<Block>(&block_ser.to_bytes().unwrap()) {
Err(_e) => Err(StorageError::InvalidValue),
Ok(o) => {
if o.id() != *block_id {
log_debug!(
"Invalid ObjectId.\nExp: {:?}\nGot: {:?}\nContent: {:?}",
block_id,
o.id(),
o
);
panic!("CORRUPTION OF DATA !");
}
Ok(o)
}
}
}
}
}
/// Adds a block in the storage backend.
/// The block is persisted to disk.
/// Returns the BlockId of the Block.
fn put(&self, block: &Block) -> Result<BlockId, StorageError> {
let block_ser = serde_bare::to_vec(&block).unwrap();
let block_id = block.id();
let block_id_ser = serde_bare::to_vec(&block_id).unwrap();
let lock = self.environment.read().unwrap();
let mut writer = lock.write().unwrap();
// TODO: check if the block is already in store? if yes, don't put it again.
// I didnt do it yet because it is extra cost. surely a get on the store is lighter than a put
// but doing a get in additing to a put for every call, is probably even costlier. better to deal with that at the higher level
self.main_store
.put(
&mut writer,
&block_id_ser,
&Value::Blob(block_ser.as_slice()),
)
.unwrap();
// if it has an expiry, adding the BlockId to the expiry_store
match block.expiry() {
Some(expiry) => {
self.expiry_store
.put(&mut writer, expiry, &Value::Blob(block_id_ser.as_slice()))
.unwrap();
}
_ => {}
}
writer.commit().unwrap();
Ok(block_id)
}
/// Removes the block from the storage backend.
/// The removed block is returned, so it can be inspected.
/// Also returned is the approximate size of of free space that was reclaimed.
fn del(&self, block_id: &BlockId) -> Result<(Block, usize), StorageError> {
let lock = self.environment.read().unwrap();
let mut writer = lock.write().unwrap();
let block_id_ser = serde_bare::to_vec(&block_id).unwrap();
// retrieving the block itself (we need the expiry)
let block_ser = self
.main_store
.get(&writer, block_id_ser.clone())
.unwrap()
.ok_or(StorageError::NotFound)?;
let slice = block_ser.to_bytes().unwrap();
let block = serde_bare::from_slice::<Block>(&slice).unwrap(); //FIXME propagate error?
let meta_res = self.meta_store.get(&writer, block_id_ser.clone()).unwrap();
if meta_res.is_some() {
let meta = serde_bare::from_slice::<BlockMeta>(&meta_res.unwrap().to_bytes().unwrap())
.unwrap();
if meta.last_used != 0 {
self.remove_from_lru(&mut writer, &block_id_ser.clone(), &meta.last_used)
.unwrap();
}
// removing the meta
self.meta_store
.delete(&mut writer, block_id_ser.clone())
.unwrap();
}
// delete block from main_store
self.main_store
.delete(&mut writer, block_id_ser.clone())
.unwrap();
// remove BlockId from expiry_store, if any expiry
match block.expiry() {
Some(expiry) => {
self.expiry_store
.delete(
&mut writer,
expiry,
&Value::Blob(block_id_ser.clone().as_slice()),
)
.unwrap();
}
_ => {}
}
writer.commit().unwrap();
Ok((block, slice.len()))
}
}
impl LmdbRepoStore {
/// Opens the store and returns a RepoStore object that should be kept and used to call put/get/delete/pin
/// The key is the encryption key for the data at rest.
pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result<LmdbRepoStore, StorageError> {
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
let shared_rkv = manager
.get_or_create(path, |path| {
//Rkv::new::<Lmdb>(path) // use this instead to disable encryption
Rkv::with_encryption_key_and_mapsize::<Lmdb>(path, key, 1 * 1024 * 1024 * 1024)
})
.map_err(|e| {
log_debug!("open LMDB failed: {}", e);
StorageError::BackendError
})?;
let env = shared_rkv.read().unwrap();
log_debug!(
"created env with LMDB Version: {} key: {}",
env.version(),
hex::encode(&key)
);
let main_store = env.open_single("main", StoreOptions::create()).unwrap();
let meta_store = env.open_single("meta", StoreOptions::create()).unwrap();
let mut opts = StoreOptions::<LmdbDatabaseFlags>::create();
opts.flags.set(DatabaseFlags::DUP_FIXED, true);
let expiry_store = env.open_multi_integer("expiry", opts).unwrap();
let recently_used_store = env.open_multi_integer("recently_used", opts).unwrap();
Ok(LmdbRepoStore {
environment: shared_rkv.clone(),
main_store,
meta_store,
expiry_store,
recently_used_store,
})
}
//FIXME: use BlockId, not ObjectId. this is a block level operation
/// Pins the object
pub fn pin(&self, object_id: &ObjectId) -> Result<(), StorageError> {
self.set_pin(object_id, true)
}
//FIXME: use BlockId, not ObjectId. this is a block level operation
/// Unpins the object
pub fn unpin(&self, object_id: &ObjectId) -> Result<(), StorageError> {
self.set_pin(object_id, false)
}
//FIXME: use BlockId, not ObjectId. this is a block level operation
/// Sets the pin for that Object. if add is true, will add the pin. if false, will remove the pin.
/// A pin on an object prevents it from being removed when the store is making some disk space by using the LRU.
/// A pin does not override the expiry. If expiry is set and is reached, the obejct will be deleted, no matter what.
pub fn set_pin(&self, object_id: &ObjectId, add: bool) -> Result<(), StorageError> {
let lock = self.environment.read().unwrap();
let mut writer = lock.write().unwrap();
let obj_id_ser = serde_bare::to_vec(&object_id).unwrap();
let meta_ser = self.meta_store.get(&writer, &obj_id_ser).unwrap();
let mut meta;
// if adding a pin, if there is a meta (if already pinned, return) and is synced, remove the last_used timestamp from recently_used_store
// if no meta, create it with pin:true, synced: false
// if removing a pin (if pin already removed, return), if synced, add an entry to recently_used_store with the last_used timestamp (as found in meta, dont use now)
match meta_ser {
Some(meta_value) => {
meta =
serde_bare::from_slice::<BlockMeta>(&meta_value.to_bytes().unwrap()).unwrap();
if add == meta.pin {
// pinning while already pinned, or unpinning while already unpinned. NOP
return Ok(());
};
meta.pin = add;
if meta.synced {
if add {
// we remove the previous timestamp (last_used) from recently_used_store
self.remove_from_lru(&mut writer, &obj_id_ser, &meta.last_used)
.unwrap();
} else {
// we add an entry to recently_used_store with last_used
self.add_to_lru(&mut writer, &obj_id_ser, &meta.last_used)
.unwrap();
}
}
}
None => {
if add {
meta = BlockMeta {
pin: true,
synced: false,
last_used: 0,
}
} else {
// there is no meta, and user wants to unpin, so let's leave everything as it is.
return Ok(());
}
}
}
let new_meta_ser = serde_bare::to_vec(&meta).unwrap();
self.meta_store
.put(
&mut writer,
obj_id_ser,
&Value::Blob(new_meta_ser.as_slice()),
)
.unwrap();
// commit
writer.commit().unwrap();
Ok(())
}
//FIXME: use BlockId, not ObjectId. this is a block level operation
/// the broker calls this method when the block has been retrieved/synced by enough peers and it
/// can now be included in the LRU for potential garbage collection.
/// If this method has not been called on a block, it will be kept in the store and will not enter LRU.
pub fn has_been_synced(&self, block_id: &BlockId, when: Option<u32>) -> Result<(), Error> {
let lock = self.environment.read().unwrap();
let mut writer = lock.write().unwrap();
let block_id_ser = serde_bare::to_vec(&block_id).unwrap();
let meta_ser = self.meta_store.get(&writer, block_id_ser.clone()).unwrap();
let mut meta;
let now = match when {
None => now_timestamp(),
Some(w) => w,
};
// get the meta. if no meta, it is ok, we will create it after (with pin:false and synced:true)
// if already synced, return
// update the meta with last_used:now and synced:true
// if pinned, save and return
// otherwise add an entry to recently_used_store with now
match meta_ser {
Some(meta_value) => {
meta =
serde_bare::from_slice::<BlockMeta>(&meta_value.to_bytes().unwrap()).unwrap();
if meta.synced {
// already synced. NOP
return Ok(());
};
meta.synced = true;
meta.last_used = now;
if !meta.pin {
// we add an entry to recently_used_store with now
log_debug!("adding to LRU");
self.add_to_lru(&mut writer, &block_id_ser, &now).unwrap();
}
}
None => {
meta = BlockMeta {
pin: false,
synced: true,
last_used: now,
};
log_debug!("adding to LRU also");
self.add_to_lru(&mut writer, &block_id_ser, &now).unwrap();
}
}
let new_meta_ser = serde_bare::to_vec(&meta).unwrap();
self.meta_store
.put(
&mut writer,
block_id_ser,
&Value::Blob(new_meta_ser.as_slice()),
)
.unwrap();
// commit
writer.commit().unwrap();
Ok(())
}
/// Removes all the blocks that have expired.
/// The broker should call this method periodically.
pub fn remove_expired(&self) -> Result<(), Error> {
let mut block_ids: Vec<BlockId> = vec![];
{
let lock = self.environment.read().unwrap();
let reader = lock.read().unwrap();
let mut iter = self
.expiry_store
.iter_prev_dup_from(&reader, now_timestamp())
.unwrap();
while let Some(Ok(mut sub_iter)) = iter.next() {
while let Some(Ok(k)) = sub_iter.next() {
//log_debug!("removing {:?} {:?}", k.0, k.1);
let block_id = serde_bare::from_slice::<ObjectId>(k.1).unwrap();
block_ids.push(block_id);
}
}
}
for block_id in block_ids {
self.del(&block_id).unwrap();
}
Ok(())
}
/// Removes some blocks that haven't been used for a while, reclaiming some space on disk.
/// The oldest are removed first, until the total amount of data removed is at least equal to size,
/// or the LRU list became empty. The approximate size of the storage space that was reclaimed is returned.
pub fn remove_least_used(&self, size: usize) -> usize {
let mut block_ids: Vec<BlockId> = vec![];
let mut total: usize = 0;
{
let lock = self.environment.read().unwrap();
let reader = lock.read().unwrap();
let mut iter = self.recently_used_store.iter_start(&reader).unwrap();
while let Some(Ok(entry)) = iter.next() {
let block_id =
serde_bare::from_slice::<ObjectId>(entry.1.to_bytes().unwrap().as_slice())
.unwrap();
block_ids.push(block_id);
}
}
for block_id in block_ids {
let (block, block_size) = self.del(&block_id).unwrap();
log_debug!("removed {:?}", block_id);
total += block_size;
if total >= size {
break;
}
}
total
}
fn remove_from_lru(
&self,
writer: &mut Writer<LmdbRwTransaction>,
block_id_ser: &Vec<u8>,
time: &Timestamp,
) -> Result<(), StoreError> {
self.recently_used_store
.delete(writer, *time, &Value::Blob(block_id_ser.as_slice()))
}
fn add_to_lru(
&self,
writer: &mut Writer<LmdbRwTransaction>,
block_id_ser: &Vec<u8>,
time: &Timestamp,
) -> Result<(), StoreError> {
let mut flag = LmdbWriteFlags::empty();
flag.set(WriteFlags::APPEND_DUP, true);
self.recently_used_store.put_with_flags(
writer,
*time,
&Value::Blob(block_id_ser.as_slice()),
flag,
)
}
fn list_all(&self) {
let lock = self.environment.read().unwrap();
let reader = lock.read().unwrap();
log_debug!("MAIN");
let mut iter = self.main_store.iter_start(&reader).unwrap();
while let Some(Ok(entry)) = iter.next() {
log_debug!("{:?} {:?}", entry.0, entry.1)
}
log_debug!("META");
let mut iter2 = self.meta_store.iter_start(&reader).unwrap();
while let Some(Ok(entry)) = iter2.next() {
log_debug!("{:?} {:?}", entry.0, entry.1)
}
log_debug!("EXPIRY");
let mut iter3 = self.expiry_store.iter_start(&reader).unwrap();
while let Some(Ok(entry)) = iter3.next() {
log_debug!("{:?} {:?}", entry.0, entry.1)
}
log_debug!("LRU");
let mut iter4 = self.recently_used_store.iter_start(&reader).unwrap();
while let Some(Ok(entry)) = iter4.next() {
log_debug!("{:?} {:?}", entry.0, entry.1)
}
}
}
#[cfg(test)]
mod test {
use crate::repo_store::LmdbRepoStore;
use p2p_repo::log::*;
use p2p_repo::store::*;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use rkv::backend::{BackendInfo, BackendStat, Lmdb, LmdbEnvironment};
use rkv::{Manager, Rkv, StoreOptions, Value};
#[allow(unused_imports)]
use std::time::Duration;
#[allow(unused_imports)]
use std::{fs, thread};
use tempfile::Builder;
#[test]
pub fn test_remove_least_used() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let mut now = now_timestamp();
now -= 200;
// TODO: fix the LMDB bug that is triggered with x max set to 86 !!!
for x in 1..85 {
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
None,
vec![x; 10],
None,
);
let block_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", x, block_id);
store
.has_been_synced(&block_id, Some(now + x as u32))
.unwrap();
}
let ret = store.remove_least_used(200);
log_debug!("removed {}", ret);
assert_eq!(ret, 208)
//store.list_all();
}
#[test]
pub fn test_set_pin() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let mut now = now_timestamp();
now -= 200;
// TODO: fix the LMDB bug that is triggered with x max set to 86 !!!
for x in 1..100 {
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
None,
vec![x; 10],
None,
);
let obj_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", x, obj_id);
store.set_pin(&obj_id, true).unwrap();
store
.has_been_synced(&obj_id, Some(now + x as u32))
.unwrap();
}
let ret = store.remove_least_used(200);
log_debug!("removed {}", ret);
assert_eq!(ret, 0);
store.list_all();
}
#[test]
pub fn test_get_valid_value_size() {
assert_eq!(store_valid_value_size(0), 4072);
assert_eq!(store_valid_value_size(2), 4072);
assert_eq!(store_valid_value_size(4072), 4072);
assert_eq!(store_valid_value_size(4072 + 1), 4072 + 4096);
assert_eq!(store_valid_value_size(4072 + 4096), 4072 + 4096);
assert_eq!(store_valid_value_size(4072 + 4096 + 1), 4072 + 4096 + 4096);
assert_eq!(
store_valid_value_size(4072 + 4096 + 4096),
4072 + 4096 + 4096
);
assert_eq!(
store_valid_value_size(4072 + 4096 + 4096 + 1),
4072 + 4096 + 4096 + 4096
);
assert_eq!(store_valid_value_size(4072 + 4096 * 511), 4072 + 4096 * 511);
assert_eq!(
store_valid_value_size(4072 + 4096 * 511 + 1),
4072 + 4096 * 511
);
}
#[test]
pub fn test_remove_expired() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let now = now_timestamp();
let list = [
now - 10,
now - 6,
now - 6,
now - 3,
now - 2,
now - 1, //#5 should be removed, and above
now + 3,
now + 4,
now + 4,
now + 5,
now + 10,
];
let mut block_ids: Vec<ObjectId> = Vec::with_capacity(11);
log_debug!("now {}", now);
let mut i = 0u8;
for expiry in list {
//let i: u8 = (expiry + 10 - now).try_into().unwrap();
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
Some(expiry),
[i].to_vec(),
None,
);
let block_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", i, block_id);
block_ids.push(block_id);
i += 1;
}
store.remove_expired().unwrap();
assert!(store.get(block_ids.get(0).unwrap()).is_err());
assert!(store.get(block_ids.get(1).unwrap()).is_err());
assert!(store.get(block_ids.get(2).unwrap()).is_err());
assert!(store.get(block_ids.get(5).unwrap()).is_err());
assert!(store.get(block_ids.get(6).unwrap()).is_ok());
assert!(store.get(block_ids.get(7).unwrap()).is_ok());
//store.list_all();
}
#[test]
pub fn test_remove_all_expired() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let now = now_timestamp();
let list = [
now - 10,
now - 6,
now - 6,
now - 3,
now - 2,
now - 2, //#5 should be removed, and above
];
let mut block_ids: Vec<ObjectId> = Vec::with_capacity(6);
log_debug!("now {}", now);
let mut i = 0u8;
for expiry in list {
//let i: u8 = (expiry + 10 - now).try_into().unwrap();
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
Some(expiry),
[i].to_vec(),
None,
);
let block_id = store.put(&block).unwrap();
log_debug!("#{} -> objId {:?}", i, block_id);
block_ids.push(block_id);
i += 1;
}
store.remove_expired().unwrap();
assert!(store.get(block_ids.get(0).unwrap()).is_err());
assert!(store.get(block_ids.get(1).unwrap()).is_err());
assert!(store.get(block_ids.get(2).unwrap()).is_err());
assert!(store.get(block_ids.get(3).unwrap()).is_err());
assert!(store.get(block_ids.get(4).unwrap()).is_err());
assert!(store.get(block_ids.get(5).unwrap()).is_err());
}
#[test]
pub fn test_remove_empty_expired() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let store = LmdbRepoStore::open(root.path(), key).unwrap();
store.remove_expired().unwrap();
}
#[test]
pub fn test_store_block() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut store = LmdbRepoStore::open(root.path(), key).unwrap();
let block = Block::new(
Vec::new(),
ObjectDeps::ObjectIdList(Vec::new()),
None,
b"abc".to_vec(),
None,
);
let block_id = store.put(&block).unwrap();
assert_eq!(block_id, block.id());
log_debug!("ObjectId: {:?}", block_id);
assert_eq!(
block_id,
Digest::Blake3Digest32([
155, 83, 186, 17, 95, 10, 80, 31, 111, 24, 250, 64, 8, 145, 71, 193, 103, 246, 202,
28, 202, 144, 63, 65, 85, 229, 136, 85, 202, 34, 13, 85
])
);
let block_res = store.get(&block_id).unwrap();
log_debug!("Block: {:?}", block_res);
assert_eq!(block_res.id(), block.id());
}
#[test]
pub fn test_lmdb() {
let path_str = "test-env";
let root = Builder::new().prefix(path_str).tempdir().unwrap();
// we set an encryption key with all zeros... for test purpose only ;)
let key: [u8; 32] = [0; 32];
{
fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
let shared_rkv = manager
.get_or_create(root.path(), |path| {
// Rkv::new::<Lmdb>(path) // use this instead to disable encryption
Rkv::with_encryption_key_and_mapsize::<Lmdb>(path, key, 1 * 1024 * 1024 * 1024)
})
.unwrap();
let env = shared_rkv.read().unwrap();
log_debug!("LMDB Version: {}", env.version());
let store = env.open_single("testdb", StoreOptions::create()).unwrap();
{
// Use a write transaction to mutate the store via a `Writer`. There can be only
// one writer for a given environment, so opening a second one will block until
// the first completes.
let mut writer = env.write().unwrap();
// Keys are `AsRef<[u8]>`, while values are `Value` enum instances. Use the `Blob`
// variant to store arbitrary collections of bytes. Putting data returns a
// `Result<(), StoreError>`, where StoreError is an enum identifying the reason
// for a failure.
// store.put(&mut writer, "int", &Value::I64(1234)).unwrap();
// store
// .put(&mut writer, "uint", &Value::U64(1234_u64))
// .unwrap();
// store
// .put(&mut writer, "float", &Value::F64(1234.0.into()))
// .unwrap();
// store
// .put(&mut writer, "instant", &Value::Instant(1528318073700))
// .unwrap();
// store
// .put(&mut writer, "boolean", &Value::Bool(true))
// .unwrap();
// store
// .put(&mut writer, "string", &Value::Str("Héllo, wörld!"))
// .unwrap();
// store
// .put(
// &mut writer,
// "json",
// &Value::Json(r#"{"foo":"bar", "number": 1}"#),
// )
// .unwrap();
const EXTRA: usize = 2095; // + 4096 * 524280 + 0;
let key: [u8; 33] = [0; 33];
let key2: [u8; 33] = [2; 33];
let key3: [u8; 33] = [3; 33];
let key4: [u8; 33] = [4; 33];
//let value: [u8; 1977 + EXTRA] = [1; 1977 + EXTRA];
let value = vec![1; 1977 + EXTRA];
let value2: [u8; 1977 + 1] = [1; 1977 + 1];
let value4: [u8; 953 + 0] = [1; 953 + 0];
store.put(&mut writer, key, &Value::Blob(&value2)).unwrap();
store.put(&mut writer, key2, &Value::Blob(&value2)).unwrap();
// store.put(&mut writer, key3, &Value::Blob(&value)).unwrap();
// store.put(&mut writer, key4, &Value::Blob(&value4)).unwrap();
// You must commit a write transaction before the writer goes out of scope, or the
// transaction will abort and the data won't persist.
writer.commit().unwrap();
let reader = env.read().expect("reader");
let stat = store.stat(&reader).unwrap();
log_debug!("LMDB stat page_size : {}", stat.page_size());
log_debug!("LMDB stat depth : {}", stat.depth());
log_debug!("LMDB stat branch_pages : {}", stat.branch_pages());
log_debug!("LMDB stat leaf_pages : {}", stat.leaf_pages());
log_debug!("LMDB stat overflow_pages : {}", stat.overflow_pages());
log_debug!("LMDB stat entries : {}", stat.entries());
}
// {
// // Use a read transaction to query the store via a `Reader`. There can be multiple
// // concurrent readers for a store, and readers never block on a writer nor other
// // readers.
// let reader = env.read().expect("reader");
// // Keys are `AsRef<u8>`, and the return value is `Result<Option<Value>, StoreError>`.
// // log_debug!("Get int {:?}", store.get(&reader, "int").unwrap());
// // log_debug!("Get uint {:?}", store.get(&reader, "uint").unwrap());
// // log_debug!("Get float {:?}", store.get(&reader, "float").unwrap());
// // log_debug!("Get instant {:?}", store.get(&reader, "instant").unwrap());
// // log_debug!("Get boolean {:?}", store.get(&reader, "boolean").unwrap());
// // log_debug!("Get string {:?}", store.get(&reader, "string").unwrap());
// // log_debug!("Get json {:?}", store.get(&reader, "json").unwrap());
// log_debug!("Get blob {:?}", store.get(&reader, "blob").unwrap());
// // Retrieving a non-existent value returns `Ok(None)`.
// log_debug!(
// "Get non-existent value {:?}",
// store.get(&reader, "non-existent").unwrap()
// );
// // A read transaction will automatically close once the reader goes out of scope,
// // so isn't necessary to close it explicitly, although you can do so by calling
// // `Reader.abort()`.
// }
// {
// // Aborting a write transaction rolls back the change(s).
// let mut writer = env.write().unwrap();
// store.put(&mut writer, "foo", &Value::Blob(b"bar")).unwrap();
// writer.abort();
// let reader = env.read().expect("reader");
// log_debug!(
// "It should be None! ({:?})",
// store.get(&reader, "foo").unwrap()
// );
// }
// {
// // Explicitly aborting a transaction is not required unless an early abort is
// // desired, since both read and write transactions will implicitly be aborted once
// // they go out of scope.
// {
// let mut writer = env.write().unwrap();
// store.put(&mut writer, "foo", &Value::Blob(b"bar")).unwrap();
// }
// let reader = env.read().expect("reader");
// log_debug!(
// "It should be None! ({:?})",
// store.get(&reader, "foo").unwrap()
// );
// }
// {
// // Deleting a key/value pair also requires a write transaction.
// let mut writer = env.write().unwrap();
// store.put(&mut writer, "foo", &Value::Blob(b"bar")).unwrap();
// store.put(&mut writer, "bar", &Value::Blob(b"baz")).unwrap();
// store.delete(&mut writer, "foo").unwrap();
// // A write transaction also supports reading, and the version of the store that it
// // reads includes the changes it has made regardless of the commit state of that
// // transaction.
// // In the code above, "foo" and "bar" were put into the store, then "foo" was
// // deleted so only "bar" will return a result when the database is queried via the
// // writer.
// log_debug!(
// "It should be None! ({:?})",
// store.get(&writer, "foo").unwrap()
// );
// log_debug!("Get bar ({:?})", store.get(&writer, "bar").unwrap());
// // But a reader won't see that change until the write transaction is committed.
// {
// let reader = env.read().expect("reader");
// log_debug!("Get foo {:?}", store.get(&reader, "foo").unwrap());
// log_debug!("Get bar {:?}", store.get(&reader, "bar").unwrap());
// }
// writer.commit().unwrap();
// {
// let reader = env.read().expect("reader");
// log_debug!(
// "It should be None! ({:?})",
// store.get(&reader, "foo").unwrap()
// );
// log_debug!("Get bar {:?}", store.get(&reader, "bar").unwrap());
// }
// // Committing a transaction consumes the writer, preventing you from reusing it by
// // failing at compile time with an error. This line would report "error[E0382]:
// // borrow of moved value: `writer`".
// // store.put(&mut writer, "baz", &Value::Str("buz")).unwrap();
// }
// {
// // Clearing all the entries in the store with a write transaction.
// {
// let mut writer = env.write().unwrap();
// store.put(&mut writer, "foo", &Value::Blob(b"bar")).unwrap();
// store.put(&mut writer, "bar", &Value::Blob(b"baz")).unwrap();
// writer.commit().unwrap();
// }
// // {
// // let mut writer = env.write().unwrap();
// // store.clear(&mut writer).unwrap();
// // writer.commit().unwrap();
// // }
// // {
// // let reader = env.read().expect("reader");
// // log_debug!(
// // "It should be None! ({:?})",
// // store.get(&reader, "foo").unwrap()
// // );
// // log_debug!(
// // "It should be None! ({:?})",
// // store.get(&reader, "bar").unwrap()
// // );
// // }
// }
let stat = env.stat().unwrap();
let info = env.info().unwrap();
log_debug!("LMDB info map_size : {}", info.map_size());
log_debug!("LMDB info last_pgno : {}", info.last_pgno());
log_debug!("LMDB info last_txnid : {}", info.last_txnid());
log_debug!("LMDB info max_readers : {}", info.max_readers());
log_debug!("LMDB info num_readers : {}", info.num_readers());
log_debug!("LMDB stat page_size : {}", stat.page_size());
log_debug!("LMDB stat depth : {}", stat.depth());
log_debug!("LMDB stat branch_pages : {}", stat.branch_pages());
log_debug!("LMDB stat leaf_pages : {}", stat.leaf_pages());
log_debug!("LMDB stat overflow_pages : {}", stat.overflow_pages());
log_debug!("LMDB stat entries : {}", stat.entries());
}
// We reopen the env and data to see if it was well saved to disk.
{
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
let shared_rkv = manager
.get_or_create(root.path(), |path| {
//Rkv::new::<Lmdb>(path) // use this instead to disable encryption
Rkv::with_encryption_key_and_mapsize::<Lmdb>(path, key, 1 * 1024 * 1024 * 1024)
})
.unwrap();
let env = shared_rkv.read().unwrap();
log_debug!("LMDB Version: {}", env.version());
let mut store = env.open_single("testdb", StoreOptions::default()).unwrap(); //StoreOptions::create()
{
let reader = env.read().expect("reader");
log_debug!(
"It should be baz! ({:?})",
store.get(&reader, "bar").unwrap()
);
}
}
// Here the database and environment is closed, but the files are still present in the temp directory.
// uncomment this if you need time to copy them somewhere for analysis, before the temp folder get destroyed
//thread::sleep(Duration::from_millis(20000));
}
}
Loading…
Cancel
Save