diff --git a/src/ffi.rs b/src/ffi.rs index 62712d0..f2728f2 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -189,6 +189,9 @@ extern "C" { -> DBInstance; pub fn rocksdb_writeoptions_create() -> DBWriteOptions; pub fn rocksdb_writeoptions_destroy(writeopts: DBWriteOptions); + pub fn rocksdb_writeoptions_set_sync(writeopts: DBWriteOptions, v: bool); + pub fn rocksdb_writeoptions_disable_WAL(writeopts: DBWriteOptions, + v: c_int); pub fn rocksdb_put(db: DBInstance, writeopts: DBWriteOptions, k: *const u8, diff --git a/src/lib.rs b/src/lib.rs index 9d27965..8b774f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ pub use ffi as rocksdb_ffi; pub use ffi::{DBCompactionStyle, DBComparator, new_bloom_filter}; pub use rocksdb::{DB, DBIterator, DBVector, Direction, IteratorMode, Writable, WriteBatch}; -pub use rocksdb_options::{BlockBasedOptions, Options}; +pub use rocksdb_options::{BlockBasedOptions, Options, WriteOptions}; pub use merge_operator::MergeOperands; pub mod rocksdb; pub mod ffi; diff --git a/src/rocksdb.rs b/src/rocksdb.rs index cbb5bf6..82932fc 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -27,7 +27,7 @@ use std::str::from_utf8; use self::libc::{c_void, size_t}; use rocksdb_ffi::{self, DBCFHandle, error_message}; -use rocksdb_options::Options; +use rocksdb_options::{Options, WriteOptions}; pub struct DB { inner: rocksdb_ffi::DBInstance, @@ -381,16 +381,17 @@ impl DB { Ok(()) } - pub fn write(&self, batch: WriteBatch) -> Result<(), String> { - let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() }; + pub fn write_opt(&self, + batch: WriteBatch, + writeopts: &WriteOptions) + -> Result<(), String> { let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; unsafe { rocksdb_ffi::rocksdb_write(self.inner, - writeopts.clone(), + writeopts.inner, batch.inner, err_ptr); - rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); } if !err.is_null() { return Err(error_message(err)); @@ -398,6 +399,10 @@ impl DB { return Ok(()); } + pub fn write(&self, batch: WriteBatch) -> Result<(), String> { + self.write_opt(batch, &WriteOptions::new()) + } + pub fn get(&self, key: &[u8]) -> Result, String> { unsafe { let readopts = rocksdb_ffi::rocksdb_readoptions_create(); @@ -538,22 +543,22 @@ impl DB { pub fn snapshot(&self) -> Snapshot { Snapshot::new(self) } -} -impl Writable for DB { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + pub fn put_opt(&self, + key: &[u8], + value: &[u8], + writeopts: &WriteOptions) + -> Result<(), String> { unsafe { - let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_put(self.inner, - writeopts.clone(), + writeopts.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t, err_ptr); - rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); } @@ -561,106 +566,105 @@ impl Writable for DB { } } - fn put_cf(&self, - cf: DBCFHandle, - key: &[u8], - value: &[u8]) - -> Result<(), String> { + pub fn put_cf_opt(&self, + cf: DBCFHandle, + key: &[u8], + value: &[u8], + writeopts: &WriteOptions) + -> Result<(), String> { unsafe { - let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_put_cf(self.inner, - writeopts.clone(), + writeopts.inner, cf, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t, err_ptr); - rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); } Ok(()) } } - - fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + pub fn merge_opt(&self, + key: &[u8], + value: &[u8], + writeopts: &WriteOptions) + -> Result<(), String> { unsafe { - let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_merge(self.inner, - writeopts.clone(), + writeopts.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t, err_ptr); - rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); } Ok(()) } } - - fn merge_cf(&self, - cf: DBCFHandle, - key: &[u8], - value: &[u8]) - -> Result<(), String> { + fn merge_cf_opt(&self, + cf: DBCFHandle, + key: &[u8], + value: &[u8], + writeopts: &WriteOptions) + -> Result<(), String> { unsafe { - let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_merge_cf(self.inner, - writeopts.clone(), + writeopts.inner, cf, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t, err_ptr); - rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); } Ok(()) } } - - fn delete(&self, key: &[u8]) -> Result<(), String> { + fn delete_opt(&self, + key: &[u8], + writeopts: &WriteOptions) + -> Result<(), String> { unsafe { - let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_delete(self.inner, - writeopts.clone(), + writeopts.inner, key.as_ptr(), key.len() as size_t, err_ptr); - rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); } Ok(()) } } - - fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> { + fn delete_cf_opt(&self, + cf: DBCFHandle, + key: &[u8], + writeopts: &WriteOptions) + -> Result<(), String> { unsafe { - let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; let err_ptr: *mut *const i8 = &mut err; rocksdb_ffi::rocksdb_delete_cf(self.inner, - writeopts.clone(), + writeopts.inner, cf, key.as_ptr(), key.len() as size_t, err_ptr); - rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); if !err.is_null() { return Err(error_message(err)); } @@ -669,6 +673,40 @@ impl Writable for DB { } } +impl Writable for DB { + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + self.put_opt(key, value, &WriteOptions::new()) + } + + fn put_cf(&self, + cf: DBCFHandle, + key: &[u8], + value: &[u8]) + -> Result<(), String> { + self.put_cf_opt(cf, key, value, &WriteOptions::new()) + } + + fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + self.merge_opt(key, value, &WriteOptions::new()) + } + + fn merge_cf(&self, + cf: DBCFHandle, + key: &[u8], + value: &[u8]) + -> Result<(), String> { + self.merge_cf_opt(cf, key, value, &WriteOptions::new()) + } + + fn delete(&self, key: &[u8]) -> Result<(), String> { + self.delete_opt(key, &WriteOptions::new()) + } + + fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> { + self.delete_cf_opt(cf, key, &WriteOptions::new()) + } +} + impl WriteBatch { pub fn new() -> WriteBatch { WriteBatch { diff --git a/src/rocksdb_options.rs b/src/rocksdb_options.rs index f561cb5..35d9f6d 100644 --- a/src/rocksdb_options.rs +++ b/src/rocksdb_options.rs @@ -30,6 +30,10 @@ pub struct Options { pub inner: rocksdb_ffi::DBOptions, } +pub struct WriteOptions { + pub inner: rocksdb_ffi::DBWriteOptions, +} + impl Drop for Options { fn drop(&mut self) { unsafe { @@ -46,6 +50,14 @@ impl Drop for BlockBasedOptions { } } +impl Drop for WriteOptions { + fn drop(&mut self) { + unsafe { + rocksdb_ffi::rocksdb_writeoptions_destroy(self.inner); + } + } +} + impl BlockBasedOptions { pub fn new() -> BlockBasedOptions { let block_opts = unsafe { @@ -315,3 +327,19 @@ impl Options { } } } + +impl WriteOptions { + pub fn new() -> WriteOptions { + let write_opts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() }; + let rocksdb_ffi::DBWriteOptions(opt_ptr) = write_opts; + if opt_ptr.is_null() { + panic!("Could not create rocksdb write options".to_string()); + } + WriteOptions { inner: write_opts } + } + pub fn set_sync(&mut self, sync: bool) { + unsafe { + rocksdb_ffi::rocksdb_writeoptions_set_sync(self.inner, sync); + } + } +}