From 0caadb7d7e3b64c7d0081ade81a1bf8cd1ab9edd Mon Sep 17 00:00:00 2001 From: Eugene P Date: Tue, 16 Oct 2018 15:48:25 +0300 Subject: [PATCH] Add initial checkpoints implementation and tests Reformat code Extract log_size_for_flush into constant Fix test nits Rename Checkpoint to CheckpointBuilder and fix test nit Add comment to LOG_SIZE_FOR_FLUSH and fix CheckpointBuilder comment Rename CheckpointBuilder to Checkpoint and create_checkpoint to save_to Rename save_to to create_checkpoint to be consistent with C API naming --- src/checkpoint.rs | 80 +++++++++++++++++++++++++++ src/lib.rs | 1 + tests/test_checkpoint.rs | 113 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 194 insertions(+) create mode 100644 src/checkpoint.rs create mode 100644 tests/test_checkpoint.rs diff --git a/src/checkpoint.rs b/src/checkpoint.rs new file mode 100644 index 0000000..68af220 --- /dev/null +++ b/src/checkpoint.rs @@ -0,0 +1,80 @@ +// Copyright 2018 Eugene P. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +///! Implementation of bindings to RocksDB Checkpoint[1] API +/// +/// [1]: https://github.com/facebook/rocksdb/wiki/Checkpoints + +use {DB, Error}; +use ffi; +use std::ffi::CString; +use std::path::Path; + +/// Undocumented parameter for `ffi::rocksdb_checkpoint_create` function. Zero by default. +const LOG_SIZE_FOR_FLUSH: u64 = 0_u64; + +/// Database's checkpoint object. +/// Used to create checkpoints of the specified DB from time to time. +pub struct Checkpoint { + inner: *mut ffi::rocksdb_checkpoint_t, +} + +impl Checkpoint { + /// Creates new checkpoint object for specific DB. + /// + /// Does not actually produce checkpoints, call `.create_checkpoint()` method to produce + /// a DB checkpoint. + pub fn new(db: &DB) -> Result { + let checkpoint: *mut ffi::rocksdb_checkpoint_t; + + unsafe { checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner,)) }; + + if checkpoint.is_null() { + return Err(Error::new("Could not create checkpoint object.".to_owned())); + } + + Ok(Checkpoint { + inner: checkpoint, + }) + } + + /// Creates new physical DB checkpoint in directory specified by `path`. + pub fn create_checkpoint>(&self, path: P) -> Result<(), Error> { + let path = path.as_ref(); + let cpath = match CString::new(path.to_string_lossy().as_bytes()) { + Ok(c) => c, + Err(_) => { + return Err(Error::new( + "Failed to convert path to CString when creating DB checkpoint" + .to_owned(), + )); + } + }; + + unsafe { + ffi_try!(ffi::rocksdb_checkpoint_create(self.inner, cpath.as_ptr(), LOG_SIZE_FOR_FLUSH,)); + + Ok(()) + } + } +} + +impl Drop for Checkpoint { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_checkpoint_object_destroy(self.inner); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index b8193bf..fa191b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,7 @@ extern crate librocksdb_sys as ffi; mod ffi_util; pub mod backup; +pub mod checkpoint; mod comparator; pub mod merge_operator; pub mod compaction_filter; diff --git a/tests/test_checkpoint.rs b/tests/test_checkpoint.rs new file mode 100644 index 0000000..cfddb03 --- /dev/null +++ b/tests/test_checkpoint.rs @@ -0,0 +1,113 @@ +// Copyright 2018 Eugene P. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +extern crate rocksdb; + +use rocksdb::{checkpoint::Checkpoint, DB, Options}; +use std::fs::remove_dir_all; + +#[test] +pub fn test_single_checkpoint() { + const PATH_PREFIX: &str = "_rust_rocksdb_cp_single_"; + + // Create DB with some data + let db_path = format!("{}db1", PATH_PREFIX); + + let _ = remove_dir_all(&db_path); + + let mut opts = Options::default(); + opts.create_if_missing(true); + let db = DB::open(&opts, &db_path).unwrap(); + + db.put(b"k1", b"v1").unwrap(); + db.put(b"k2", b"v2").unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.put(b"k4", b"v4").unwrap(); + + // Create checkpoint + let cp1 = Checkpoint::new(&db).unwrap(); + let cp1_path = format!("{}cp1", PATH_PREFIX); + let _ = remove_dir_all(&cp1_path); + cp1.create_checkpoint(&cp1_path).unwrap(); + + // Verify checkpoint + let cp = DB::open_default(&cp1_path).unwrap(); + + assert_eq!(*cp.get(b"k1").unwrap().unwrap(), *b"v1"); + assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"v2"); + assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3"); + assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4"); + + let _ = remove_dir_all(&db_path); + let _ = remove_dir_all(&cp1_path); +} + +#[test] +pub fn test_multi_checkpoints() { + const PATH_PREFIX: &str = "_rust_rocksdb_cp_multi_"; + + // Create DB with some data + let db_path = format!("{}db1", PATH_PREFIX); + let _ = remove_dir_all(&db_path); + + let mut opts = Options::default(); + opts.create_if_missing(true); + let db = DB::open(&opts, &db_path).unwrap(); + + db.put(b"k1", b"v1").unwrap(); + db.put(b"k2", b"v2").unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.put(b"k4", b"v4").unwrap(); + + // Create first checkpoint + let cp1 = Checkpoint::new(&db).unwrap(); + let cp1_path = format!("{}cp1", PATH_PREFIX); + let _ = remove_dir_all(&cp1_path); + cp1.create_checkpoint(&cp1_path).unwrap(); + + // Verify checkpoint + let cp = DB::open_default(&cp1_path).unwrap(); + + assert_eq!(*cp.get(b"k1").unwrap().unwrap(), *b"v1"); + assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"v2"); + assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3"); + assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4"); + + let _ = remove_dir_all(&cp1_path); + + // Change some existing keys + db.put(b"k1", b"modified").unwrap(); + db.put(b"k2", b"changed").unwrap(); + + // Add some new keys + db.put(b"k5", b"v5").unwrap(); + db.put(b"k6", b"v6").unwrap(); + + // Create another checkpoint + let cp2 = Checkpoint::new(&db).unwrap(); + let cp2_path = format!("{}cp2", PATH_PREFIX); + let _ = remove_dir_all(&cp2_path); + cp2.create_checkpoint(&cp2_path).unwrap(); + + // Verify second checkpoint + let cp = DB::open_default(&cp2_path).unwrap(); + + assert_eq!(*cp.get(b"k1").unwrap().unwrap(), *b"modified"); + assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"changed"); + assert_eq!(*cp.get(b"k5").unwrap().unwrap(), *b"v5"); + assert_eq!(*cp.get(b"k6").unwrap().unwrap(), *b"v6"); + + let _ = remove_dir_all(&db_path); + let _ = remove_dir_all(&cp2_path); +}