Add support for open_for_read_only APIs (#402)

master
wqfish 5 years ago committed by GitHub
parent 8a7996e943
commit a8b77cfd8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 194
      src/db.rs

@ -21,7 +21,7 @@ use crate::{
WriteBatch, WriteOptions, WriteBatch, WriteOptions,
}; };
use libc::{self, c_char, c_int, c_void, size_t}; use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fmt; use std::fmt;
@ -50,20 +50,35 @@ unsafe impl Send for DB {}
// use within the rocksdb library is generally behind a const reference // use within the rocksdb library is generally behind a const reference
unsafe impl Sync for DB {} unsafe impl Sync for DB {}
// Specifies whether open DB for read only.
enum AccessType {
ReadWrite,
ReadOnly { error_if_log_file_exist: bool },
}
impl DB { impl DB {
/// Open a database with default options. /// Opens a database with default options.
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<DB, Error> { pub fn open_default<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
let mut opts = Options::default(); let mut opts = Options::default();
opts.create_if_missing(true); opts.create_if_missing(true);
DB::open(&opts, path) DB::open(&opts, path)
} }
/// Open the database with the specified options. /// Opens the database with the specified options.
pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<DB, Error> { pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<DB, Error> {
DB::open_cf(opts, path, None::<&str>) DB::open_cf(opts, path, None::<&str>)
} }
/// Open a database with the given database options and column family names. /// Opens the database for read only with the specified options.
pub fn open_for_read_only<P: AsRef<Path>>(
opts: &Options,
path: P,
error_if_log_file_exist: bool,
) -> Result<DB, Error> {
DB::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
}
/// Opens a database with the given database options and column family names.
/// ///
/// Column families opened using this function will be created with default `Options`. /// Column families opened using this function will be created with default `Options`.
pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<DB, Error> pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<DB, Error>
@ -76,11 +91,51 @@ impl DB {
.into_iter() .into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
DB::open_cf_descriptors(opts, path, cfs) DB::open_cf_descriptors_internal(opts, path, cfs, AccessType::ReadWrite)
}
/// Opens a database for read only with the given database options and column family names.
pub fn open_cf_for_read_only<P, I, N>(
opts: &Options,
path: P,
cfs: I,
error_if_log_file_exist: bool,
) -> Result<DB, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
N: AsRef<str>,
{
let cfs = cfs
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
DB::open_cf_descriptors_internal(
opts,
path,
cfs,
AccessType::ReadOnly {
error_if_log_file_exist,
},
)
} }
/// Open a database with the given database options and column family descriptors. /// Opens a database with the given database options and column family descriptors.
pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<DB, Error> pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<DB, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
DB::open_cf_descriptors_internal(opts, path, cfs, AccessType::ReadWrite)
}
/// Internal implementation for opening RocksDB.
fn open_cf_descriptors_internal<P, I>(
opts: &Options,
path: P,
cfs: I,
access_type: AccessType,
) -> Result<DB, Error>
where where
P: AsRef<Path>, P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>, I: IntoIterator<Item = ColumnFamilyDescriptor>,
@ -100,9 +155,7 @@ impl DB {
let mut cf_map = BTreeMap::new(); let mut cf_map = BTreeMap::new();
if cfs.is_empty() { if cfs.is_empty() {
unsafe { db = DB::open_raw(opts, cpath, access_type)?;
db = ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _));
}
} else { } else {
let mut cfs_v = cfs; let mut cfs_v = cfs;
// Always open the default column family. // Always open the default column family.
@ -119,27 +172,25 @@ impl DB {
.map(|cf| CString::new(cf.name.as_bytes()).unwrap()) .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
.collect(); .collect();
let mut cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect(); let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
// These handles will be populated by DB. // These handles will be populated by DB.
let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect(); let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
let mut cfopts: Vec<_> = cfs_v let cfopts: Vec<_> = cfs_v
.iter() .iter()
.map(|cf| cf.options.inner as *const _) .map(|cf| cf.options.inner as *const _)
.collect(); .collect();
unsafe { db = DB::open_cf_raw(
db = ffi_try!(ffi::rocksdb_open_column_families( opts,
opts.inner, cpath,
cpath.as_ptr(), &cfs_v,
cfs_v.len() as c_int, &cfnames,
cfnames.as_mut_ptr(), &cfopts,
cfopts.as_mut_ptr(), &mut cfhandles,
cfhandles.as_mut_ptr(), access_type,
)); )?;
}
for handle in &cfhandles { for handle in &cfhandles {
if handle.is_null() { if handle.is_null() {
return Err(Error::new( return Err(Error::new(
@ -164,6 +215,63 @@ impl DB {
}) })
} }
fn open_raw(
opts: &Options,
cpath: CString,
access_type: AccessType,
) -> Result<*mut ffi::rocksdb_t, Error> {
let db = unsafe {
match access_type {
AccessType::ReadOnly {
error_if_log_file_exist,
} => ffi_try!(ffi::rocksdb_open_for_read_only(
opts.inner,
cpath.as_ptr() as *const _,
error_if_log_file_exist as c_uchar,
)),
AccessType::ReadWrite => {
ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _))
}
}
};
Ok(db)
}
fn open_cf_raw(
opts: &Options,
cpath: CString,
cfs_v: &[ColumnFamilyDescriptor],
cfnames: &[*const c_char],
cfopts: &[*const ffi::rocksdb_options_t],
cfhandles: &mut Vec<*mut ffi::rocksdb_column_family_handle_t>,
access_type: AccessType,
) -> Result<*mut ffi::rocksdb_t, Error> {
let db = unsafe {
match access_type {
AccessType::ReadOnly {
error_if_log_file_exist,
} => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
opts.inner,
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
error_if_log_file_exist as c_uchar,
)),
AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
opts.inner,
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
)),
}
};
Ok(db)
}
pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> { pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
let cpath = to_cpath(path)?; let cpath = to_cpath(path)?;
let mut length = 0; let mut length = 0;
@ -958,6 +1066,48 @@ impl fmt::Debug for DB {
} }
} }
#[test]
fn test_open_for_read_only() {
let path = "_rust_rocksdb_test_open_for_read_only";
{
let db = DB::open_default(path).unwrap();
db.put(b"k1", b"v1").unwrap();
}
{
let opts = Options::default();
let error_if_log_file_exist = false;
let db = DB::open_for_read_only(&opts, path, error_if_log_file_exist).unwrap();
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
assert!(db.put(b"k2", b"v2").is_err());
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn test_open_cf_for_read_only() {
let path = "_rust_rocksdb_test_open_cf_for_read_only";
let cfs = vec!["cf1"];
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, path, cfs.clone()).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(cf1, b"k1", b"v1").unwrap();
}
{
let opts = Options::default();
let error_if_log_file_exist = false;
let db = DB::open_cf_for_read_only(&opts, path, cfs, error_if_log_file_exist).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
assert!(db.put_cf(cf1, b"k2", b"v2").is_err());
}
let opts = Options::default();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test] #[test]
fn external() { fn external() {
let path = "_rust_rocksdb_externaltest"; let path = "_rust_rocksdb_externaltest";

Loading…
Cancel
Save