Add initial checkpoints implementation and tests

Reformat code

Extract log_size_for_flush into constant

Fix test nits

Rename Checkpoint to CheckpointBuilder and fix test nit

Add comment to LOG_SIZE_FOR_FLUSH and fix CheckpointBuilder comment

Rename CheckpointBuilder to Checkpoint and create_checkpoint to save_to

Rename save_to to create_checkpoint to be consistent with C API naming
master
Eugene P 6 years ago
parent 0475bbeaea
commit 0caadb7d7e
  1. 80
      src/checkpoint.rs
  2. 1
      src/lib.rs
  3. 113
      tests/test_checkpoint.rs

@ -0,0 +1,80 @@
// Copyright 2018 Eugene P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
///! Implementation of bindings to RocksDB Checkpoint[1] API
///
/// [1]: https://github.com/facebook/rocksdb/wiki/Checkpoints
use {DB, Error};
use ffi;
use std::ffi::CString;
use std::path::Path;
/// Undocumented parameter for `ffi::rocksdb_checkpoint_create` function. Zero by default.
const LOG_SIZE_FOR_FLUSH: u64 = 0_u64;
/// Database's checkpoint object.
/// Used to create checkpoints of the specified DB from time to time.
pub struct Checkpoint {
inner: *mut ffi::rocksdb_checkpoint_t,
}
impl Checkpoint {
/// Creates new checkpoint object for specific DB.
///
/// Does not actually produce checkpoints, call `.create_checkpoint()` method to produce
/// a DB checkpoint.
pub fn new(db: &DB) -> Result<Checkpoint, Error> {
let checkpoint: *mut ffi::rocksdb_checkpoint_t;
unsafe { checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner,)) };
if checkpoint.is_null() {
return Err(Error::new("Could not create checkpoint object.".to_owned()));
}
Ok(Checkpoint {
inner: checkpoint,
})
}
/// Creates new physical DB checkpoint in directory specified by `path`.
pub fn create_checkpoint<P: AsRef<Path>>(&self, path: P) -> Result<(), Error> {
let path = path.as_ref();
let cpath = match CString::new(path.to_string_lossy().as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(Error::new(
"Failed to convert path to CString when creating DB checkpoint"
.to_owned(),
));
}
};
unsafe {
ffi_try!(ffi::rocksdb_checkpoint_create(self.inner, cpath.as_ptr(), LOG_SIZE_FOR_FLUSH,));
Ok(())
}
}
}
impl Drop for Checkpoint {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_checkpoint_object_destroy(self.inner);
}
}
}

@ -53,6 +53,7 @@ extern crate librocksdb_sys as ffi;
mod ffi_util; mod ffi_util;
pub mod backup; pub mod backup;
pub mod checkpoint;
mod comparator; mod comparator;
pub mod merge_operator; pub mod merge_operator;
pub mod compaction_filter; pub mod compaction_filter;

@ -0,0 +1,113 @@
// Copyright 2018 Eugene P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern crate rocksdb;
use rocksdb::{checkpoint::Checkpoint, DB, Options};
use std::fs::remove_dir_all;
#[test]
pub fn test_single_checkpoint() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_single_";
// Create DB with some data
let db_path = format!("{}db1", PATH_PREFIX);
let _ = remove_dir_all(&db_path);
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
// Create checkpoint
let cp1 = Checkpoint::new(&db).unwrap();
let cp1_path = format!("{}cp1", PATH_PREFIX);
let _ = remove_dir_all(&cp1_path);
cp1.create_checkpoint(&cp1_path).unwrap();
// Verify checkpoint
let cp = DB::open_default(&cp1_path).unwrap();
assert_eq!(*cp.get(b"k1").unwrap().unwrap(), *b"v1");
assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"v2");
assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3");
assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4");
let _ = remove_dir_all(&db_path);
let _ = remove_dir_all(&cp1_path);
}
#[test]
pub fn test_multi_checkpoints() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_multi_";
// Create DB with some data
let db_path = format!("{}db1", PATH_PREFIX);
let _ = remove_dir_all(&db_path);
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, &db_path).unwrap();
db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
// Create first checkpoint
let cp1 = Checkpoint::new(&db).unwrap();
let cp1_path = format!("{}cp1", PATH_PREFIX);
let _ = remove_dir_all(&cp1_path);
cp1.create_checkpoint(&cp1_path).unwrap();
// Verify checkpoint
let cp = DB::open_default(&cp1_path).unwrap();
assert_eq!(*cp.get(b"k1").unwrap().unwrap(), *b"v1");
assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"v2");
assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3");
assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4");
let _ = remove_dir_all(&cp1_path);
// Change some existing keys
db.put(b"k1", b"modified").unwrap();
db.put(b"k2", b"changed").unwrap();
// Add some new keys
db.put(b"k5", b"v5").unwrap();
db.put(b"k6", b"v6").unwrap();
// Create another checkpoint
let cp2 = Checkpoint::new(&db).unwrap();
let cp2_path = format!("{}cp2", PATH_PREFIX);
let _ = remove_dir_all(&cp2_path);
cp2.create_checkpoint(&cp2_path).unwrap();
// Verify second checkpoint
let cp = DB::open_default(&cp2_path).unwrap();
assert_eq!(*cp.get(b"k1").unwrap().unwrap(), *b"modified");
assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"changed");
assert_eq!(*cp.get(b"k5").unwrap().unwrap(), *b"v5");
assert_eq!(*cp.get(b"k6").unwrap().unwrap(), *b"v6");
let _ = remove_dir_all(&db_path);
let _ = remove_dir_all(&cp2_path);
}
Loading…
Cancel
Save