Adds a versioning system to the storage encoding

pull/46/head
Tpt 4 years ago
parent f9d97a2296
commit 65b763b09a
  1. 2
      lib/src/store/mod.rs
  2. 31
      lib/src/store/rocksdb.rs
  3. 30
      lib/src/store/sled.rs

@ -32,6 +32,8 @@ use std::io;
use std::io::{BufRead, Write}; use std::io::{BufRead, Write};
use std::iter::Iterator; use std::iter::Iterator;
const LATEST_STORAGE_VERSION: u64 = 0;
pub(crate) trait ReadableEncodedStore: StrLookup { pub(crate) trait ReadableEncodedStore: StrLookup {
type QuadsIter: Iterator<Item = Result<EncodedQuad<Self::StrId>, Self::Error>> + 'static; type QuadsIter: Iterator<Item = Result<EncodedQuad<Self::StrId>, Self::Error>> + 'static;

@ -10,7 +10,7 @@ use crate::store::numeric_encoder::{
}; };
use crate::store::{ use crate::store::{
dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph, dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph,
ReadableEncodedStore, WritableEncodedStore, ReadableEncodedStore, WritableEncodedStore, LATEST_STORAGE_VERSION,
}; };
use rocksdb::*; use rocksdb::*;
use std::collections::HashMap; use std::collections::HashMap;
@ -89,9 +89,34 @@ impl RocksDbStore {
options.create_missing_column_families(true); options.create_missing_column_families(true);
options.set_compaction_style(DBCompactionStyle::Universal); options.set_compaction_style(DBCompactionStyle::Universal);
Ok(Self { let this = Self {
db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES).map_err(map_err)?), db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES).map_err(map_err)?),
}) };
let version = this.ensure_version()?;
if version != LATEST_STORAGE_VERSION {
return Err(invalid_data_error(format!(
"The RocksDB database is still using the encoding version {}, please upgrade it",
version
)));
}
Ok(this)
}
fn ensure_version(&self) -> Result<u64, io::Error> {
Ok(
if let Some(version) = self.db.get("oxversion").map_err(map_err)? {
let mut buffer = [0; 8];
buffer.copy_from_slice(&version);
u64::from_be_bytes(buffer)
} else {
self.db
.put("oxversion", &LATEST_STORAGE_VERSION.to_be_bytes())
.map_err(map_err)?;
LATEST_STORAGE_VERSION
},
)
} }
/// Executes a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/). /// Executes a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/).

@ -10,13 +10,13 @@ use crate::store::numeric_encoder::{
}; };
use crate::store::{ use crate::store::{
dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph, dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph,
ReadableEncodedStore, StoreOrParseError, WritableEncodedStore, ReadableEncodedStore, StoreOrParseError, WritableEncodedStore, LATEST_STORAGE_VERSION,
}; };
use sled::transaction::{ use sled::transaction::{
ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, ConflictableTransactionError, TransactionError, Transactional, TransactionalTree,
UnabortableTransactionError, UnabortableTransactionError,
}; };
use sled::{Config, Iter, Tree}; use sled::{Config, Db, Iter, Tree};
use std::convert::TryInto; use std::convert::TryInto;
use std::error::Error; use std::error::Error;
use std::io::{BufRead, Write}; use std::io::{BufRead, Write};
@ -61,6 +61,7 @@ use std::{fmt, io, str};
/// ``` /// ```
#[derive(Clone)] #[derive(Clone)]
pub struct SledStore { pub struct SledStore {
default: Db,
id2str: Tree, id2str: Tree,
spog: Tree, spog: Tree,
posg: Tree, posg: Tree,
@ -91,7 +92,8 @@ impl SledStore {
fn do_open(config: &Config) -> Result<Self, io::Error> { fn do_open(config: &Config) -> Result<Self, io::Error> {
let db = config.open()?; let db = config.open()?;
Ok(Self { let this = Self {
default: db.clone(),
id2str: db.open_tree("id2str")?, id2str: db.open_tree("id2str")?,
spog: db.open_tree("spog")?, spog: db.open_tree("spog")?,
posg: db.open_tree("posg")?, posg: db.open_tree("posg")?,
@ -102,6 +104,28 @@ impl SledStore {
dspo: db.open_tree("dspo")?, dspo: db.open_tree("dspo")?,
dpos: db.open_tree("dpos")?, dpos: db.open_tree("dpos")?,
dosp: db.open_tree("dosp")?, dosp: db.open_tree("dosp")?,
};
let version = this.ensure_version()?;
if version != LATEST_STORAGE_VERSION {
return Err(invalid_data_error(format!(
"The Sled database is still using the encoding version {}, please upgrade it",
version
)));
}
Ok(this)
}
fn ensure_version(&self) -> Result<u64, io::Error> {
Ok(if let Some(version) = self.default.get("oxversion")? {
let mut buffer = [0; 8];
buffer.copy_from_slice(&version);
u64::from_be_bytes(buffer)
} else {
self.default
.insert("oxversion", &LATEST_STORAGE_VERSION.to_be_bytes())?;
LATEST_STORAGE_VERSION
}) })
} }

Loading…
Cancel
Save