Change a column family storing

master
Oleksandr Anyshchenko 5 years ago
parent bf17f0ad1c
commit 54ff3db8ec
  1. 44
      src/backup.rs
  2. 115
      src/db.rs
  3. 10
      src/lib.rs
  4. 58
      tests/test_backup.rs
  5. 10
      tests/test_column_family.rs
  6. 14
      tests/test_property.rs
  7. 6
      tests/test_slice_transform.rs

@ -202,47 +202,3 @@ impl Drop for RestoreOptions {
}
}
}
#[test]
fn backup_restore() {
use db::DBVector;
use Options;
// create backup
let path = "_rust_rocksdb_backup_restore_test";
{
let db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
let backup_path = "_rust_rocksdb_backup_path";
{
let backup_opts = BackupEngineOptions::default();
let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap();
let r = backup_engine.create_new_backup(&db);
assert!(r.is_ok());
let restore_path = "_rust_rocksdb_restore_from_backup_path";
{
let mut restore_option = RestoreOptions::default();
restore_option.set_keep_log_files(true); // true to keep log files
let restore_status = backup_engine.restore_from_latest_backup(
&restore_path,
&restore_path,
&restore_option,
);
assert!(restore_status.is_ok());
let db_restore = DB::open_default(restore_path).unwrap();
let r: Result<Option<DBVector>, Error> = db_restore.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
}
assert!(DB::destroy(&Options::default(), restore_path).is_ok());
}
assert!(DB::destroy(&Options::default(), backup_path).is_ok());
}
assert!(DB::destroy(&Options::default(), path).is_ok());
}

@ -28,7 +28,6 @@ use std::path::Path;
use std::ptr;
use std::slice;
use std::str;
use std::sync::{Arc, RwLock};
unsafe impl Send for DB {}
unsafe impl Sync for DB {}
@ -221,7 +220,7 @@ impl<'a> DBRawIterator<'a> {
fn new_cf(
db: &DB,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
readopts: &ReadOptions,
) -> Result<DBRawIterator<'a>, Error> {
unsafe {
@ -476,7 +475,7 @@ impl<'a> DBIterator<'a> {
fn new_cf(
db: &DB,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
readopts: &ReadOptions,
mode: IteratorMode,
) -> Result<DBIterator<'a>, Error> {
@ -566,7 +565,7 @@ impl<'a> Snapshot<'a> {
pub fn iterator_cf(
&self,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
let readopts = ReadOptions::default();
@ -580,7 +579,7 @@ impl<'a> Snapshot<'a> {
pub fn iterator_cf_opt(
&self,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
mut readopts: ReadOptions,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
@ -593,7 +592,7 @@ impl<'a> Snapshot<'a> {
self.raw_iterator_opt(readopts)
}
pub fn raw_iterator_cf(&self, cf_handle: ColumnFamily) -> Result<DBRawIterator, Error> {
pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result<DBRawIterator, Error> {
let readopts = ReadOptions::default();
self.raw_iterator_cf_opt(cf_handle, readopts)
}
@ -605,7 +604,7 @@ impl<'a> Snapshot<'a> {
pub fn raw_iterator_cf_opt(
&self,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
mut readopts: ReadOptions,
) -> Result<DBRawIterator, Error> {
readopts.set_snapshot(self);
@ -619,7 +618,7 @@ impl<'a> Snapshot<'a> {
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
) -> Result<Option<DBVector>, Error> {
let readopts = ReadOptions::default();
@ -637,7 +636,7 @@ impl<'a> Snapshot<'a> {
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
mut readopts: ReadOptions,
) -> Result<Option<DBVector>, Error> {
@ -724,7 +723,7 @@ impl DB {
}
let db: *mut ffi::rocksdb_t;
let cf_map = Arc::new(RwLock::new(BTreeMap::new()));
let mut cf_map = BTreeMap::new();
if cfs.is_empty() {
unsafe {
@ -777,11 +776,8 @@ impl DB {
}
}
for (n, h) in cfs_v.iter().zip(cfhandles) {
cf_map
.write()
.map_err(|e| Error::new(e.to_string()))?
.insert(n.name.clone(), h);
for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
cf_map.insert(cf_desc.name.clone(), ColumnFamily { inner });
}
}
@ -908,7 +904,7 @@ impl DB {
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBVector>, Error> {
@ -945,7 +941,7 @@ impl DB {
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
) -> Result<Option<DBVector>, Error> {
self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
@ -997,7 +993,7 @@ impl DB {
/// allows specifying ColumnFamily
pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice>, Error> {
@ -1034,13 +1030,13 @@ impl DB {
/// leverages default options.
pub fn get_pinned_cf<K: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
) -> Result<Option<DBPinnableSlice>, Error> {
self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
}
pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<ColumnFamily, Error> {
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let cname = match CString::new(name.as_ref().as_bytes()) {
Ok(c) => c,
Err(_) => {
@ -1051,35 +1047,23 @@ impl DB {
));
}
};
let cf = unsafe {
let cf_handle = ffi_try!(ffi::rocksdb_create_column_family(
unsafe {
let inner = ffi_try!(ffi::rocksdb_create_column_family(
self.inner,
opts.inner,
cname.as_ptr(),
));
self.cfs
.write()
.map_err(|e| Error::new(e.to_string()))?
.insert(name.as_ref().to_string(), cf_handle);
ColumnFamily {
inner: cf_handle,
db: PhantomData,
}
.insert(name.as_ref().to_string(), ColumnFamily { inner });
};
Ok(cf)
Ok(())
}
pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
if let Some(cf) = self
.cfs
.write()
.map_err(|e| Error::new(e.to_string()))?
.remove(name)
{
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
if let Some(cf) = self.cfs.remove(name) {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf,));
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.inner,));
}
Ok(())
} else {
@ -1090,11 +1074,8 @@ impl DB {
}
/// Return the underlying column family handle.
pub fn cf_handle(&self, name: &str) -> Option<ColumnFamily> {
self.cfs.read().ok()?.get(name).map(|h| ColumnFamily {
inner: *h,
db: PhantomData,
})
pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
self.cfs.get(name)
}
pub fn iterator(&self, mode: IteratorMode) -> DBIterator {
@ -1110,7 +1091,7 @@ impl DB {
/// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
pub fn iterator_cf_opt(
&self,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
readopts: &ReadOptions,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
@ -1138,7 +1119,7 @@ impl DB {
pub fn iterator_cf(
&self,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
let opts = ReadOptions::default();
@ -1147,7 +1128,7 @@ impl DB {
pub fn full_iterator_cf(
&self,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
let mut opts = ReadOptions::default();
@ -1157,7 +1138,7 @@ impl DB {
pub fn prefix_iterator_cf<P: AsRef<[u8]>>(
&self,
cf_handle: ColumnFamily,
cf_handle: &ColumnFamily,
prefix: P,
) -> Result<DBIterator, Error> {
let mut opts = ReadOptions::default();
@ -1175,7 +1156,7 @@ impl DB {
DBRawIterator::new(self, &opts)
}
pub fn raw_iterator_cf(&self, cf_handle: ColumnFamily) -> Result<DBRawIterator, Error> {
pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result<DBRawIterator, Error> {
let opts = ReadOptions::default();
DBRawIterator::new_cf(self, cf_handle, &opts)
}
@ -1207,7 +1188,7 @@ impl DB {
pub fn put_cf_opt<K, V>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
value: V,
writeopts: &WriteOptions,
@ -1256,7 +1237,7 @@ impl DB {
pub fn merge_cf_opt<K, V>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
value: V,
writeopts: &WriteOptions,
@ -1302,7 +1283,7 @@ impl DB {
pub fn delete_cf_opt<K: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
key: K,
writeopts: &WriteOptions,
) -> Result<(), Error> {
@ -1328,7 +1309,7 @@ impl DB {
self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
}
pub fn put_cf<K, V>(&self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
pub fn put_cf<K, V>(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -1344,7 +1325,7 @@ impl DB {
self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
}
pub fn merge_cf<K, V>(&self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
pub fn merge_cf<K, V>(&self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -1356,7 +1337,7 @@ impl DB {
self.delete_opt(key.as_ref(), &WriteOptions::default())
}
pub fn delete_cf<K: AsRef<[u8]>>(&self, cf: ColumnFamily, key: K) -> Result<(), Error> {
pub fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), Error> {
self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
}
@ -1377,7 +1358,7 @@ impl DB {
pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
start: Option<S>,
end: Option<E>,
) {
@ -1466,7 +1447,11 @@ impl DB {
///
/// For a full list of properties, see
/// https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634
pub fn property_value_cf(&self, cf: ColumnFamily, name: &str) -> Result<Option<String>, Error> {
pub fn property_value_cf(
&self,
cf: &ColumnFamily,
name: &str,
) -> Result<Option<String>, Error> {
let prop_name = match CString::new(name) {
Ok(c) => c,
Err(e) => {
@ -1522,7 +1507,7 @@ impl DB {
/// https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689
pub fn property_int_value_cf(
&self,
cf: ColumnFamily,
cf: &ColumnFamily,
name: &str,
) -> Result<Option<u64>, Error> {
match self.property_value_cf(cf, name) {
@ -1578,7 +1563,7 @@ impl WriteBatch {
}
}
pub fn put_cf<K, V>(&mut self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
pub fn put_cf<K, V>(&mut self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -1619,7 +1604,7 @@ impl WriteBatch {
}
}
pub fn merge_cf<K, V>(&mut self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
pub fn merge_cf<K, V>(&mut self, cf: &ColumnFamily, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
@ -1656,7 +1641,7 @@ impl WriteBatch {
}
}
pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: ColumnFamily, key: K) -> Result<(), Error> {
pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &ColumnFamily, key: K) -> Result<(), Error> {
let key = key.as_ref();
unsafe {
@ -1697,7 +1682,7 @@ impl WriteBatch {
/// keys exist in the range ["begin_key", "end_key").
pub fn delete_range_cf<K: AsRef<[u8]>>(
&mut self,
cf: ColumnFamily,
cf: &ColumnFamily,
from: K,
to: K,
) -> Result<(), Error> {
@ -1742,10 +1727,8 @@ impl Drop for WriteBatch {
impl Drop for DB {
fn drop(&mut self) {
unsafe {
if let Ok(cfs) = self.cfs.read() {
for cf in cfs.values() {
ffi::rocksdb_column_family_handle_destroy(*cf);
}
for cf in self.cfs.values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner);
}
ffi::rocksdb_close(self.inner);
}

@ -81,16 +81,14 @@ pub use merge_operator::MergeOperands;
use std::collections::BTreeMap;
use std::error;
use std::fmt;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
/// A RocksDB database.
///
/// See crate level documentation for a simple usage example.
pub struct DB {
inner: *mut ffi::rocksdb_t,
cfs: Arc<RwLock<BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>>>,
cfs: BTreeMap<String, ColumnFamily>,
path: PathBuf,
}
@ -283,10 +281,8 @@ pub struct WriteOptions {
/// An opaque type used to represent a column family. Returned from some functions, and used
/// in others
#[derive(Copy, Clone)]
pub struct ColumnFamily<'a> {
pub struct ColumnFamily {
inner: *mut ffi::rocksdb_column_family_handle_t,
db: PhantomData<&'a DB>,
}
unsafe impl<'a> Send for ColumnFamily<'a> {}
unsafe impl Send for ColumnFamily {}

@ -0,0 +1,58 @@
// Copyright 2019 Tyler Neely
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern crate rocksdb;
use rocksdb::{
backup::{BackupEngine, BackupEngineOptions, RestoreOptions},
Options, DB,
};
use std::fs;
#[test]
fn backup_restore() {
// create backup
let path = "_rust_rocksdb_backup_test";
let restore_path = "_rust_rocksdb_restore_from_backup_path";
let mut opts = Options::default();
{
opts.create_if_missing(true);
let db = DB::open(&opts, path).unwrap();
assert!(db.put(b"k1", b"v1111").is_ok());
let value = db.get(b"k1");
assert_eq!(value.unwrap().unwrap().as_ref(), b"v1111");
{
let backup_path = "_rust_rocksdb_backup_path";
let backup_opts = BackupEngineOptions::default();
let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap();
assert!(backup_engine.create_new_backup(&db).is_ok());
let mut restore_option = RestoreOptions::default();
restore_option.set_keep_log_files(true); // true to keep log files
let restore_status = backup_engine.restore_from_latest_backup(
&restore_path,
&restore_path,
&restore_option,
);
assert!(restore_status.is_ok());
let db_restore = DB::open_default(restore_path).unwrap();
let value = db_restore.get(b"k1");
assert_eq!(value.unwrap().unwrap().as_ref(), b"v1111");
assert!(fs::remove_dir_all(backup_path).is_ok());
}
}
assert!(DB::destroy(&opts, restore_path).is_ok());
assert!(DB::destroy(&opts, path).is_ok());
}

@ -27,10 +27,10 @@ fn test_column_family() {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator("test operator", test_provided_merge, None);
let db = DB::open(&opts, &n).unwrap();
let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default();
match db.create_cf("cf1", &opts) {
Ok(_db) => println!("cf1 created successfully"),
Ok(()) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
}
@ -80,7 +80,7 @@ fn test_column_family() {
{}
// should b able to drop a cf
{
let db = DB::open_cf(&Options::default(), &n, &["cf1"]).unwrap();
let mut db = DB::open_cf(&Options::default(), &n, &["cf1"]).unwrap();
match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e),
@ -97,7 +97,7 @@ fn test_can_open_db_with_results_of_list_cf() {
{
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &n).unwrap();
let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default();
assert!(db.create_cf("cf1", &opts).is_ok());
@ -244,7 +244,7 @@ fn test_create_duplicate_column_family() {
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = match DB::open_cf(&opts, &n, &["cf1"]) {
let mut db = match DB::open_cf(&opts, &n, &["cf1"]) {
Ok(d) => d,
Err(e) => panic!("failed to create new column family: {}", e),
};

@ -34,8 +34,9 @@ fn property_cf_test() {
let n = DBPath::new("_rust_rocksdb_property_cf_test");
{
let opts = Options::default();
let db = DB::open_default(&n).unwrap();
let cf = db.create_cf("cf1", &opts).unwrap();
let mut db = DB::open_default(&n).unwrap();
db.create_cf("cf1", &opts).unwrap();
let cf = db.cf_handle("cf1").unwrap();
let value = db.property_value_cf(cf, "rocksdb.stats").unwrap().unwrap();
assert!(value.contains("Stats"));
@ -51,7 +52,7 @@ fn property_int_test() {
.property_int_value("rocksdb.estimate-live-data-size")
.unwrap();
assert!(value == Some(0));
assert_eq!(value, Some(0));
}
}
@ -60,12 +61,13 @@ fn property_int_cf_test() {
let n = DBPath::new("_rust_rocksdb_property_int_cf_test");
{
let opts = Options::default();
let db = DB::open_default(&n).unwrap();
let cf = db.create_cf("cf1", &opts).unwrap();
let mut db = DB::open_default(&n).unwrap();
db.create_cf("cf1", &opts).unwrap();
let cf = db.cf_handle("cf1").unwrap();
let total_keys = db
.property_int_value_cf(cf, "rocksdb.estimate-num-keys")
.unwrap();
assert!(total_keys == Some(0));
assert_eq!(total_keys, Some(0));
}
}

@ -6,14 +6,14 @@ use util::DBPath;
#[test]
pub fn test_slice_transform() {
let n = DBPath::new("_rust_rocksdb_slicetransform_test");
let db_path = DBPath::new("_rust_rocksdb_slicetransform_test");
{
let a1: Box<[u8]> = key(b"aaa1");
let a2: Box<[u8]> = key(b"aaa2");
let b1: Box<[u8]> = key(b"bbb1");
let b2: Box<[u8]> = key(b"bbb2");
fn first_three<'a>(k: &'a [u8]) -> &'a [u8] {
fn first_three(k: &[u8]) -> &[u8] {
&k[..3]
}
@ -23,7 +23,7 @@ pub fn test_slice_transform() {
opts.create_if_missing(true);
opts.set_prefix_extractor(prefix_extractor);
let db = DB::open(&opts, &n).unwrap();
let db = DB::open(&opts, &db_path).unwrap();
assert!(db.put(&*a1, &*a1).is_ok());
assert!(db.put(&*a2, &*a2).is_ok());

Loading…
Cancel
Save