Merge pull request #11 from dgrnbrg/master

Add support for WriteBatch
master
Tyler Neely 9 years ago
commit 14dc3d00dd
  1. 4
      src/ffi.rs
  2. 2
      src/lib.rs
  3. 4
      src/main.rs
  4. 2
      src/merge_operator.rs
  5. 139
      src/rocksdb.rs

@ -272,6 +272,10 @@ extern {
pub fn rocksdb_iter_get_error(iter: RocksDBIterator, pub fn rocksdb_iter_get_error(iter: RocksDBIterator,
err: *const *const u8); err: *const *const u8);
// Write batch // Write batch
pub fn rocksdb_write(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
batch : RocksDBWriteBatch,
err: *mut i8);
pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch; pub fn rocksdb_writebatch_create() -> RocksDBWriteBatch;
pub fn rocksdb_writebatch_create_from(rep: *const u8, pub fn rocksdb_writebatch_create_from(rep: *const u8,
size: size_t) -> RocksDBWriteBatch; size: size_t) -> RocksDBWriteBatch;

@ -34,6 +34,8 @@ pub use rocksdb::{
RocksDB, RocksDB,
RocksDBResult, RocksDBResult,
RocksDBVector, RocksDBVector,
WriteBatch,
Writable,
}; };
pub use rocksdb_options::{ pub use rocksdb_options::{
RocksDBOptions, RocksDBOptions,

@ -18,7 +18,7 @@
extern crate rocksdb; extern crate rocksdb;
extern crate test; extern crate test;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter}; use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn main() { fn main() {
@ -88,7 +88,7 @@ mod tests {
use test::Bencher; use test::Bencher;
use std::thread::sleep_ms; use std::thread::sleep_ms;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter}; use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn tuned_for_somebody_elses_disk() -> RocksDB { fn tuned_for_somebody_elses_disk() -> RocksDB {

@ -21,7 +21,7 @@ use std::ptr;
use std::slice; use std::slice;
use rocksdb_options::{RocksDBOptions}; use rocksdb_options::{RocksDBOptions};
use rocksdb::{RocksDB, RocksDBResult, RocksDBVector}; use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, Writable};
pub struct MergeOperatorCallback { pub struct MergeOperatorCallback {
pub name: CString, pub name: CString,

@ -32,6 +32,17 @@ pub struct RocksDB {
inner: rocksdb_ffi::RocksDBInstance, inner: rocksdb_ffi::RocksDBInstance,
} }
pub struct WriteBatch {
inner: rocksdb_ffi::RocksDBWriteBatch
}
// This is for the RocksDB and write batches to share the same API
pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str>;
fn delete(&self, key: &[u8]) -> Result<(),&str>;
}
fn error_message<'a>(ptr: *const i8) -> &'a str { fn error_message<'a>(ptr: *const i8) -> &'a str {
unsafe { unsafe {
return from_utf8(CStr::from_ptr(ptr).to_bytes()).unwrap(); return from_utf8(CStr::from_ptr(ptr).to_bytes()).unwrap();
@ -117,27 +128,11 @@ impl RocksDB {
} }
} }
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> { pub fn write(&self, batch: WriteBatch) -> Result<(), &str> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_put(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
}
}
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(), rocksdb_ffi::rocksdb_write(self.inner, writeopts, batch.inner, err);
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
@ -172,7 +167,41 @@ impl RocksDB {
} }
} }
pub fn delete(&self, key: &[u8]) -> Result<(),&str> { pub fn close(&self) {
unsafe { rocksdb_ffi::rocksdb_close(self.inner); }
}
}
impl Writable for RocksDB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_put(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
}
}
fn delete(&self, key: &[u8]) -> Result<(),&str> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
@ -184,9 +213,51 @@ impl RocksDB {
return Ok(()) return Ok(())
} }
} }
}
pub fn close(&self) { impl WriteBatch {
unsafe { rocksdb_ffi::rocksdb_close(self.inner); } pub fn new() -> WriteBatch {
WriteBatch {
inner: unsafe {
rocksdb_ffi::rocksdb_writebatch_create()
}
}
}
}
impl Drop for WriteBatch {
fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_writebatch_destroy(self.inner)
}
}
}
impl Writable for WriteBatch {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t);
return Ok(())
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t);
return Ok(())
}
}
fn delete(&self, key: &[u8]) -> Result<(),&str> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t);
return Ok(())
}
} }
} }
@ -310,3 +381,29 @@ fn external() {
let opts = RocksDBOptions::new(); let opts = RocksDBOptions::new();
assert!(RocksDB::destroy(opts, path).is_ok()); assert!(RocksDB::destroy(opts, path).is_ok());
} }
#[test]
fn writebatch_works() {
let path = "_rust_rocksdb_writebacktest";
let db = RocksDB::open_default(path).unwrap();
{ // test put
let batch = WriteBatch::new();
assert!(db.get(b"k1").is_none());
batch.put(b"k1", b"v1111");
assert!(db.get(b"k1").is_none());
let p = db.write(batch);
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
}
{ // test delete
let batch = WriteBatch::new();
batch.delete(b"k1");
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k1").is_none());
}
db.close();
let opts = RocksDBOptions::new();
assert!(RocksDB::destroy(opts, path).is_ok());
}

Loading…
Cancel
Save