nuke DBResult

master
Tyler Neely 9 years ago
parent a2334ab39a
commit 0fc2441a2e
  1. 2
      src/lib.rs
  2. 38
      src/main.rs
  3. 17
      src/merge_operator.rs
  4. 106
      src/rocksdb.rs
  5. 15
      test/test_column_family.rs
  6. 141
      test/test_column_family.rs.orig
  7. 6
      test/test_multithreaded.rs
  8. 56
      test/test_multithreaded.rs.orig

@ -18,7 +18,7 @@
pub use ffi as rocksdb_ffi; pub use ffi as rocksdb_ffi;
pub use ffi::{new_bloom_filter, DBCompactionStyle, DBComparator}; pub use ffi::{new_bloom_filter, DBCompactionStyle, DBComparator};
pub use rocksdb::{DB, DBResult, DBVector, WriteBatch, Writable, Direction}; pub use rocksdb::{DB, DBVector, WriteBatch, Writable, Direction};
pub use rocksdb_options::{Options, BlockBasedOptions}; pub use rocksdb_options::{Options, BlockBasedOptions};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
pub mod rocksdb; pub mod rocksdb;

@ -49,16 +49,18 @@ fn main() {
let path = "/tmp/rust-rocksdb"; let path = "/tmp/rust-rocksdb";
let mut db = DB::open_default(path).unwrap(); let mut db = DB::open_default(path).unwrap();
assert!(db.put(b"my key", b"my value").is_ok()); assert!(db.put(b"my key", b"my value").is_ok());
db.get(b"my key").map( |value| { match db.get(b"my key") {
Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) =>
println!("retrieved utf8 value: {}", v), println!("retrieved utf8 value: {}", v),
None => None =>
println!("did not read valid utf-8 out of the db"), println!("did not read valid utf-8 out of the db"),
} }
}) },
.on_absent( || { println!("value not found") }) Err(e) => println!("error retrieving value: {}", e),
.on_error( |e| { println!("error retrieving value: {}", e) }); _ => panic!("value not present!"),
}
assert!(db.delete(b"my key").is_ok()); assert!(db.delete(b"my key").is_ok());
@ -88,24 +90,25 @@ fn custom_merge() {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
{ {
let mut db = DB::open(&opts, path).unwrap(); let db = DB::open(&opts, path).unwrap();
db.put(b"k1", b"a"); db.put(b"k1", b"a").unwrap();
db.merge(b"k1", b"b"); db.merge(b"k1", b"b").unwrap();
db.merge(b"k1", b"c"); db.merge(b"k1", b"c").unwrap();
db.merge(b"k1", b"d"); db.merge(b"k1", b"d").unwrap();
db.merge(b"k1", b"efg"); db.merge(b"k1", b"efg").unwrap();
db.merge(b"k1", b"h"); db.merge(b"k1", b"h").unwrap();
db.get(b"k1").map( |value| { match db.get(b"k1") {
Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) =>
println!("retrieved utf8 value: {}", v), println!("retrieved utf8 value: {}", v),
None => None =>
println!("did not read valid utf-8 out of the db"), println!("did not read valid utf-8 out of the db"),
} }
}) }
.on_absent( || { println!("value not found") }) Err(e) => println!("error retrieving value: {}", e),
.on_error( |e| { println!("error retrieving value: {}", e) }); _ => panic!("value not present!"),
}
} }
DB::destroy(&opts, path).is_ok(); DB::destroy(&opts, path).is_ok();
} }
@ -130,8 +133,7 @@ fn main() {
None => panic!("value corrupted"), None => panic!("value corrupted"),
} }
}) })
.on_absent( || { panic!("value not found") }) .or_else( |e| { panic!("error retrieving value: {}", e) });
.on_error( |e| { panic!("error retrieving value: {}", e) });
db.delete(b"k1"); db.delete(b"k1");
} }
} }

@ -21,7 +21,7 @@ use std::ptr;
use std::slice; use std::slice;
use rocksdb_options::Options; use rocksdb_options::Options;
use rocksdb::{DB, DBResult, DBVector, Writable}; use rocksdb::{DB, DBVector, Writable};
pub struct MergeOperatorCallback { pub struct MergeOperatorCallback {
pub name: CString, pub name: CString,
@ -196,21 +196,24 @@ fn mergetest() {
db.merge(b"k1", b"efg"); db.merge(b"k1", b"efg");
let m = db.merge(b"k1", b"h"); let m = db.merge(b"k1", b"h");
assert!(m.is_ok()); assert!(m.is_ok());
db.get(b"k1").map( |value| { match db.get(b"k1") {
Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) =>
println!("retrieved utf8 value: {}", v), println!("retrieved utf8 value: {}", v),
None => None =>
println!("did not read valid utf-8 out of the db"), println!("did not read valid utf-8 out of the db"),
} }
}).on_absent( || { println!("value not present!") }) },
.on_error( |e| { println!("error reading value")}); //: {", e) }); Err(e) => { println!("error reading value")},
_ => panic!("value not present"),
}
assert!(m.is_ok()); assert!(m.is_ok());
let r: DBResult<DBVector, String> = db.get(b"k1"); let r: Result<Option<DBVector>, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }
assert!(DB::destroy(&opts, path).is_ok()); assert!(DB::destroy(&opts, path).is_ok());
} }

@ -327,11 +327,11 @@ impl DB {
return Ok(()) return Ok(())
} }
pub fn get(&self, key: &[u8]) -> DBResult<DBVector, String> { pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
unsafe { unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create(); let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() { if readopts.0.is_null() {
return DBResult::Error("Unable to create rocksdb read \ return Err("Unable to create rocksdb read \
options. This is a fairly trivial call, and its failure \ options. This is a fairly trivial call, and its failure \
may be indicative of a mis-compiled or mis-loaded rocksdb \ may be indicative of a mis-compiled or mis-loaded rocksdb \
library.".to_string()); library.".to_string());
@ -345,22 +345,22 @@ impl DB {
key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8; key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts); rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() { if !err.is_null() {
return DBResult::Error(error_message(err)); return Err(error_message(err));
} }
match val.is_null() { match val.is_null() {
true => DBResult::None, true => Ok(None),
false => { false => {
DBResult::Some(DBVector::from_c(val, val_len)) Ok(Some(DBVector::from_c(val, val_len)))
} }
} }
} }
} }
pub fn get_cf(&self, cf: DBCFHandle, key: &[u8]) -> DBResult<DBVector, String> { pub fn get_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<Option<DBVector>, String> {
unsafe { unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create(); let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() { if readopts.0.is_null() {
return DBResult::Error("Unable to create rocksdb read \ return Err("Unable to create rocksdb read \
options. This is a fairly trivial call, and its failure \ options. This is a fairly trivial call, and its failure \
may be indicative of a mis-compiled or mis-loaded rocksdb \ may be indicative of a mis-compiled or mis-loaded rocksdb \
library.".to_string()); library.".to_string());
@ -375,12 +375,12 @@ impl DB {
err_ptr) as *mut u8; err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts); rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() { if !err.is_null() {
return DBResult::Error(error_message(err)); return Err(error_message(err));
} }
match val.is_null() { match val.is_null() {
true => DBResult::None, true => Ok(None),
false => { false => {
DBResult::Some(DBVector::from_c(val, val_len)) Ok(Some(DBVector::from_c(val, val_len)))
} }
} }
} }
@ -691,76 +691,6 @@ impl DBVector {
} }
} }
// DBResult exists because of the inherent difference between
// an operational failure and the absence of a possible result.
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub enum DBResult<T, E> {
Some(T),
None,
Error(E),
}
impl <T, E> DBResult<T, E> {
pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> DBResult<U, E> {
match self {
DBResult::Some(x) => DBResult::Some(f(x)),
DBResult::None => DBResult::None,
DBResult::Error(e) => DBResult::Error(e),
}
}
pub fn unwrap(self) -> T {
match self {
DBResult::Some(x) => x,
DBResult::None =>
panic!("Attempted unwrap on DBResult::None"),
DBResult::Error(_) =>
panic!("Attempted unwrap on DBResult::Error"),
}
}
pub fn on_error<U, F: FnOnce(E) -> U>(self, f: F) -> DBResult<T, U> {
match self {
DBResult::Some(x) => DBResult::Some(x),
DBResult::None => DBResult::None,
DBResult::Error(e) => DBResult::Error(f(e)),
}
}
pub fn on_absent<F: FnOnce() -> ()>(self, f: F) -> DBResult<T, E> {
match self {
DBResult::Some(x) => DBResult::Some(x),
DBResult::None => {
f();
DBResult::None
},
DBResult::Error(e) => DBResult::Error(e),
}
}
pub fn is_some(self) -> bool {
match self {
DBResult::Some(_) => true,
DBResult::None => false,
DBResult::Error(_) => false,
}
}
pub fn is_none(self) -> bool {
match self {
DBResult::Some(_) => false,
DBResult::None => true,
DBResult::Error(_) => false,
}
}
pub fn is_error(self) -> bool {
match self {
DBResult::Some(_) => false,
DBResult::None => false,
DBResult::Error(_) => true,
}
}
}
#[test] #[test]
fn external() { fn external() {
let path = "_rust_rocksdb_externaltest"; let path = "_rust_rocksdb_externaltest";
@ -768,10 +698,10 @@ fn external() {
let mut db = DB::open_default(path).unwrap(); let mut db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111"); let p = db.put(b"k1", b"v1111");
assert!(p.is_ok()); assert!(p.is_ok());
let r: DBResult<DBVector, String> = db.get(b"k1"); let r: Result<Option<DBVector>, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }
let opts = Options::new(); let opts = Options::new();
let result = DB::destroy(&opts, path); let result = DB::destroy(&opts, path);
@ -797,20 +727,20 @@ fn writebatch_works() {
let mut db = DB::open_default(path).unwrap(); let mut db = DB::open_default(path).unwrap();
{ // test put { // test put
let mut batch = WriteBatch::new(); let mut batch = WriteBatch::new();
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").unwrap().is_none());
batch.put(b"k1", b"v1111"); batch.put(b"k1", b"v1111");
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").unwrap().is_none());
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
let r: DBResult<DBVector, String> = db.get(b"k1"); let r: Result<Option<DBVector>, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
} }
{ // test delete { // test delete
let mut batch = WriteBatch::new(); let mut batch = WriteBatch::new();
batch.delete(b"k1"); batch.delete(b"k1");
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }
} }
let opts = Options::new(); let opts = Options::new();

@ -13,7 +13,7 @@
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
use rocksdb::{Options, DB, DBResult, Writable, Direction, MergeOperands}; use rocksdb::{Options, DB, Writable, Direction, MergeOperands};
#[test] #[test]
pub fn test_column_family() { pub fn test_column_family() {
@ -67,7 +67,7 @@ pub fn test_column_family() {
}; };
let cf1 = *db.cf_handle("cf1").unwrap(); let cf1 = *db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok()); assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().to_utf8().unwrap() == "v1"); assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1");
let p = db.put_cf(cf1, b"k1", b"a"); let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge_cf(cf1, b"k1", b"b"); db.merge_cf(cf1, b"k1", b"b");
@ -77,20 +77,23 @@ pub fn test_column_family() {
let m = db.merge_cf(cf1, b"k1", b"h"); let m = db.merge_cf(cf1, b"k1", b"h");
println!("m is {:?}", m); println!("m is {:?}", m);
// TODO assert!(m.is_ok()); // TODO assert!(m.is_ok());
db.get(b"k1").map( |value| { match db.get(b"k1") {
Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) =>
println!("retrieved utf8 value: {}", v), println!("retrieved utf8 value: {}", v),
None => None =>
println!("did not read valid utf-8 out of the db"), println!("did not read valid utf-8 out of the db"),
} }
}).on_absent( || { println!("value not present!") }) },
.on_error( |e| { println!("error reading value")}); //: {", e) }); Err(e) => println!("error reading value"),
_ => panic!("value not present!"),
}
let r = db.get_cf(cf1, b"k1"); let r = db.get_cf(cf1, b"k1");
// TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); // TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }
// TODO should be able to use writebatch ops with a cf // TODO should be able to use writebatch ops with a cf
{ {

@ -0,0 +1,141 @@
/*
Copyright 2014 Tyler Neely
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use rocksdb::{Options, DB, DBResult, Writable, Direction, MergeOperands};
#[test]
pub fn test_column_family() {
let path = "_rust_rocksdb_cftest";
// should be able to create column families
{
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge);
let mut db = DB::open(&opts, path).unwrap();
let opts = Options::new();
match db.create_cf("cf1", &opts) {
Ok(_) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
},
}
}
// should fail to open db without specifying same column families
{
let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge);
match DB::open(&opts, path) {
Ok(_) => panic!("should not have opened DB successfully without specifying column
families"),
Err(e) => assert!(e.starts_with("Invalid argument: You have to open all column families.")),
}
}
// should properly open db when specyfing all column families
{
let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge);
match DB::open_cf(&opts, path, &["cf1"]) {
Ok(_) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e),
}
}
// TODO should be able to write, read, merge, batch, and iterate over a cf
{
let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge);
let mut db = match DB::open_cf(&opts, path, &["cf1"]) {
Ok(db) => {
println!("successfully opened db with column family");
db
},
Err(e) => panic!("failed to open db with column family: {}", e),
};
let cf1 = *db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1");
let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok());
db.merge_cf(cf1, b"k1", b"b");
db.merge_cf(cf1, b"k1", b"c");
db.merge_cf(cf1, b"k1", b"d");
db.merge_cf(cf1, b"k1", b"efg");
let m = db.merge_cf(cf1, b"k1", b"h");
println!("m is {:?}", m);
// TODO assert!(m.is_ok());
match db.get(b"k1") {
Ok(Some(value)) => {
match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v),
None =>
println!("did not read valid utf-8 out of the db"),
}
<<<<<<< Updated upstream
}).on_absent( || { println!("value not present!") })
.on_error( |e| { println!("error reading value")}); //: {", e) });
=======
},
Err(e) => println!("error reading value"),
_ => panic!("value not present!"),
}
>>>>>>> Stashed changes
let r = db.get_cf(cf1, b"k1");
// TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
// TODO should be able to use writebatch ops with a cf
{
}
// TODO should be able to iterate over a cf
{
}
// should b able to drop a cf
{
let mut db = DB::open_cf(&Options::new(), path, &["cf1"]).unwrap();
match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e),
}
}
assert!(DB::destroy(&Options::new(), path).is_ok());
}
fn test_provided_merge(new_key: &[u8],
existing_val: Option<&[u8]>,
mut operands: &mut MergeOperands)
-> Vec<u8> {
let nops = operands.size_hint().0;
let mut result: Vec<u8> = Vec::with_capacity(nops);
match existing_val {
Some(v) => {
for e in v {
result.push(*e);
}
},
None => (),
}
for op in operands {
for e in op {
result.push(*e);
}
}
result
}

@ -1,5 +1,5 @@
use rocksdb::{Options, DB, Writable, Direction, DBResult}; use rocksdb::{Options, DB, Writable};
use std::thread::{self, Builder}; use std::thread;
use std::sync::Arc; use std::sync::Arc;
const N: usize = 100_000; const N: usize = 100_000;
@ -31,7 +31,7 @@ pub fn test_multithreaded() {
let j3 = thread::spawn(move|| { let j3 = thread::spawn(move|| {
for i in 1..N { for i in 1..N {
match db3.get(b"key") { match db3.get(b"key") {
DBResult::Some(v) => { Ok(Some(v)) => {
if &v[..] != b"value1" && &v[..] != b"value2" { if &v[..] != b"value1" && &v[..] != b"value2" {
assert!(false); assert!(false);
} }

@ -0,0 +1,56 @@
<<<<<<< Updated upstream
use rocksdb::{Options, DB, Writable, Direction, DBResult};
use std::thread::{self, Builder};
=======
use rocksdb::{Options, DB, Writable};
use std::thread;
>>>>>>> Stashed changes
use std::sync::Arc;
const N: usize = 100_000;
#[test]
pub fn test_multithreaded() {
let path = "_rust_rocksdb_multithreadtest";
{
let db = DB::open_default(path).unwrap();
let db = Arc::new(db);
db.put(b"key", b"value1");
let db1 = db.clone();
let j1 = thread::spawn(move|| {
for i in 1..N {
db1.put(b"key", b"value1");
}
});
let db2 = db.clone();
let j2 = thread::spawn(move|| {
for i in 1..N {
db2.put(b"key", b"value2");
}
});
let db3 = db.clone();
let j3 = thread::spawn(move|| {
for i in 1..N {
match db3.get(b"key") {
Ok(Some(v)) => {
if &v[..] != b"value1" && &v[..] != b"value2" {
assert!(false);
}
}
_ => {
assert!(false);
}
}
}
});
j1.join();
j2.join();
j3.join();
}
assert!(DB::destroy(&Options::new(), path).is_ok());
}
Loading…
Cancel
Save