From a8b77cfd8ac494557f350d982a659099417fe56b Mon Sep 17 00:00:00 2001 From: wqfish Date: Tue, 21 Apr 2020 00:35:45 -0700 Subject: [PATCH] Add support for open_for_read_only APIs (#402) --- src/db.rs | 194 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 172 insertions(+), 22 deletions(-) diff --git a/src/db.rs b/src/db.rs index b585b05..2405670 100644 --- a/src/db.rs +++ b/src/db.rs @@ -21,7 +21,7 @@ use crate::{ WriteBatch, WriteOptions, }; -use libc::{self, c_char, c_int, c_void, size_t}; +use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; use std::collections::BTreeMap; use std::ffi::{CStr, CString}; use std::fmt; @@ -50,20 +50,35 @@ unsafe impl Send for DB {} // use within the rocksdb library is generally behind a const reference unsafe impl Sync for DB {} +// Specifies whether open DB for read only. +enum AccessType { + ReadWrite, + ReadOnly { error_if_log_file_exist: bool }, +} + impl DB { - /// Open a database with default options. + /// Opens a database with default options. pub fn open_default>(path: P) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); DB::open(&opts, path) } - /// Open the database with the specified options. + /// Opens the database with the specified options. pub fn open>(opts: &Options, path: P) -> Result { DB::open_cf(opts, path, None::<&str>) } - /// Open a database with the given database options and column family names. + /// Opens the database for read only with the specified options. + pub fn open_for_read_only>( + opts: &Options, + path: P, + error_if_log_file_exist: bool, + ) -> Result { + DB::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist) + } + + /// Opens a database with the given database options and column family names. /// /// Column families opened using this function will be created with default `Options`. pub fn open_cf(opts: &Options, path: P, cfs: I) -> Result @@ -76,11 +91,51 @@ impl DB { .into_iter() .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); - DB::open_cf_descriptors(opts, path, cfs) + DB::open_cf_descriptors_internal(opts, path, cfs, AccessType::ReadWrite) + } + + /// Opens a database for read only with the given database options and column family names. + pub fn open_cf_for_read_only( + opts: &Options, + path: P, + cfs: I, + error_if_log_file_exist: bool, + ) -> Result + where + P: AsRef, + I: IntoIterator, + N: AsRef, + { + let cfs = cfs + .into_iter() + .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); + + DB::open_cf_descriptors_internal( + opts, + path, + cfs, + AccessType::ReadOnly { + error_if_log_file_exist, + }, + ) } - /// Open a database with the given database options and column family descriptors. + /// Opens a database with the given database options and column family descriptors. pub fn open_cf_descriptors(opts: &Options, path: P, cfs: I) -> Result + where + P: AsRef, + I: IntoIterator, + { + DB::open_cf_descriptors_internal(opts, path, cfs, AccessType::ReadWrite) + } + + /// Internal implementation for opening RocksDB. + fn open_cf_descriptors_internal( + opts: &Options, + path: P, + cfs: I, + access_type: AccessType, + ) -> Result where P: AsRef, I: IntoIterator, @@ -100,9 +155,7 @@ impl DB { let mut cf_map = BTreeMap::new(); if cfs.is_empty() { - unsafe { - db = ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _)); - } + db = DB::open_raw(opts, cpath, access_type)?; } else { let mut cfs_v = cfs; // Always open the default column family. @@ -119,27 +172,25 @@ impl DB { .map(|cf| CString::new(cf.name.as_bytes()).unwrap()) .collect(); - let mut cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect(); + let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect(); // These handles will be populated by DB. let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect(); - let mut cfopts: Vec<_> = cfs_v + let cfopts: Vec<_> = cfs_v .iter() .map(|cf| cf.options.inner as *const _) .collect(); - unsafe { - db = ffi_try!(ffi::rocksdb_open_column_families( - opts.inner, - cpath.as_ptr(), - cfs_v.len() as c_int, - cfnames.as_mut_ptr(), - cfopts.as_mut_ptr(), - cfhandles.as_mut_ptr(), - )); - } - + db = DB::open_cf_raw( + opts, + cpath, + &cfs_v, + &cfnames, + &cfopts, + &mut cfhandles, + access_type, + )?; for handle in &cfhandles { if handle.is_null() { return Err(Error::new( @@ -164,6 +215,63 @@ impl DB { }) } + fn open_raw( + opts: &Options, + cpath: CString, + access_type: AccessType, + ) -> Result<*mut ffi::rocksdb_t, Error> { + let db = unsafe { + match access_type { + AccessType::ReadOnly { + error_if_log_file_exist, + } => ffi_try!(ffi::rocksdb_open_for_read_only( + opts.inner, + cpath.as_ptr() as *const _, + error_if_log_file_exist as c_uchar, + )), + AccessType::ReadWrite => { + ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _)) + } + } + }; + Ok(db) + } + + fn open_cf_raw( + opts: &Options, + cpath: CString, + cfs_v: &[ColumnFamilyDescriptor], + cfnames: &[*const c_char], + cfopts: &[*const ffi::rocksdb_options_t], + cfhandles: &mut Vec<*mut ffi::rocksdb_column_family_handle_t>, + access_type: AccessType, + ) -> Result<*mut ffi::rocksdb_t, Error> { + let db = unsafe { + match access_type { + AccessType::ReadOnly { + error_if_log_file_exist, + } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families( + opts.inner, + cpath.as_ptr(), + cfs_v.len() as c_int, + cfnames.as_ptr(), + cfopts.as_ptr(), + cfhandles.as_mut_ptr(), + error_if_log_file_exist as c_uchar, + )), + AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families( + opts.inner, + cpath.as_ptr(), + cfs_v.len() as c_int, + cfnames.as_ptr(), + cfopts.as_ptr(), + cfhandles.as_mut_ptr(), + )), + } + }; + Ok(db) + } + pub fn list_cf>(opts: &Options, path: P) -> Result, Error> { let cpath = to_cpath(path)?; let mut length = 0; @@ -958,6 +1066,48 @@ impl fmt::Debug for DB { } } +#[test] +fn test_open_for_read_only() { + let path = "_rust_rocksdb_test_open_for_read_only"; + { + let db = DB::open_default(path).unwrap(); + db.put(b"k1", b"v1").unwrap(); + } + { + let opts = Options::default(); + let error_if_log_file_exist = false; + let db = DB::open_for_read_only(&opts, path, error_if_log_file_exist).unwrap(); + assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1"); + assert!(db.put(b"k2", b"v2").is_err()); + } + let opts = Options::default(); + assert!(DB::destroy(&opts, path).is_ok()); +} + +#[test] +fn test_open_cf_for_read_only() { + let path = "_rust_rocksdb_test_open_cf_for_read_only"; + let cfs = vec!["cf1"]; + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + let db = DB::open_cf(&opts, path, cfs.clone()).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + db.put_cf(cf1, b"k1", b"v1").unwrap(); + } + { + let opts = Options::default(); + let error_if_log_file_exist = false; + let db = DB::open_cf_for_read_only(&opts, path, cfs, error_if_log_file_exist).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); + assert!(db.put_cf(cf1, b"k2", b"v2").is_err()); + } + let opts = Options::default(); + assert!(DB::destroy(&opts, path).is_ok()); +} + #[test] fn external() { let path = "_rust_rocksdb_externaltest";