From bc07d0cd2818ab09c73f86b942d667855c6a5a40 Mon Sep 17 00:00:00 2001 From: Nelson Elhage Date: Mon, 15 Jun 2020 23:39:02 -0700 Subject: [PATCH] Don't segfault on failed a merge operator. (#437) --- src/merge_operator.rs | 258 +------------------------------ tests/test_merge_operator.rs | 290 +++++++++++++++++++++++++++++++++++ 2 files changed, 292 insertions(+), 256 deletions(-) create mode 100644 tests/test_merge_operator.rs diff --git a/src/merge_operator.rs b/src/merge_operator.rs index a47faea..0ef12c4 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -111,6 +111,7 @@ pub unsafe extern "C" fn full_merge_callback( ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); buf as *mut c_char } else { + *new_value_length = 0; *success = 0 as u8; ptr::null_mut() as *mut c_char } @@ -139,6 +140,7 @@ pub unsafe extern "C" fn partial_merge_callback( ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len()); buf as *mut c_char } else { + *new_value_length = 0; *success = 0 as u8; ptr::null_mut::() } @@ -196,259 +198,3 @@ impl<'a> Iterator for &'a mut MergeOperands { (remaining, Some(remaining)) } } - -#[cfg(test)] -mod test { - - use super::MergeOperands; - - fn test_provided_merge( - _new_key: &[u8], - existing_val: Option<&[u8]>, - operands: &mut MergeOperands, - ) -> Option> { - let nops = operands.size_hint().0; - let mut result: Vec = Vec::with_capacity(nops); - if let Some(v) = existing_val { - for e in v { - result.push(*e); - } - } - for op in operands { - for e in op { - result.push(*e); - } - } - Some(result) - } - - #[test] - fn merge_test() { - use crate::{Options, DB}; - - let path = "_rust_rocksdb_merge_test"; - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.set_merge_operator("test operator", test_provided_merge, None); - { - let db = DB::open(&opts, path).unwrap(); - let p = db.put(b"k1", b"a"); - assert!(p.is_ok()); - let _ = db.merge(b"k1", b"b"); - let _ = db.merge(b"k1", b"c"); - let _ = db.merge(b"k1", b"d"); - let _ = db.merge(b"k1", b"efg"); - let m = db.merge(b"k1", b"h"); - assert!(m.is_ok()); - match db.get(b"k1") { - Ok(Some(value)) => { - if let Ok(v) = std::str::from_utf8(&value) { - println!("retrieved utf8 value: {}", v) - } else { - println!("did not read valid utf-8 out of the db") - } - } - Err(_) => println!("error reading value"), - _ => panic!("value not present"), - } - - assert!(m.is_ok()); - let r = db.get(b"k1"); - assert_eq!(r.unwrap().unwrap(), b"abcdefgh"); - assert!(db.delete(b"k1").is_ok()); - assert!(db.get(b"k1").unwrap().is_none()); - } - assert!(DB::destroy(&opts, path).is_ok()); - } - - unsafe fn to_slice(p: &T) -> &[u8] { - ::std::slice::from_raw_parts((p as *const T) as *const u8, ::std::mem::size_of::()) - } - - fn from_slice(s: &[u8]) -> Option<&T> { - if std::mem::size_of::() == s.len() { - unsafe { Some(&*(s.as_ptr() as *const T)) } - } else { - println!( - "slice {:?} is len {}, but T is size {}", - s, - s.len(), - std::mem::size_of::() - ); - None - } - } - - #[repr(packed)] - #[derive(Copy, Clone, Debug, Default)] - struct ValueCounts { - num_a: u32, - num_b: u32, - num_c: u32, - num_d: u32, - } - - fn test_counting_partial_merge( - _new_key: &[u8], - _existing_val: Option<&[u8]>, - operands: &mut MergeOperands, - ) -> Option> { - let nops = operands.size_hint().0; - let mut result: Vec = Vec::with_capacity(nops); - for op in operands { - for e in op { - result.push(*e); - } - } - Some(result) - } - - fn test_counting_full_merge( - _new_key: &[u8], - existing_val: Option<&[u8]>, - operands: &mut MergeOperands, - ) -> Option> { - let mut counts = if let Some(v) = existing_val { - *from_slice::(v).unwrap_or(&ValueCounts::default()) - } else { - ValueCounts::default() - }; - - for op in operands { - for e in op { - match *e { - b'a' => counts.num_a += 1, - b'b' => counts.num_b += 1, - b'c' => counts.num_c += 1, - b'd' => counts.num_d += 1, - _ => {} - } - } - } - let slc = unsafe { to_slice(&counts) }; - Some(slc.to_vec()) - } - - #[test] - #[allow(clippy::too_many_lines)] - fn counting_merge_test() { - use crate::{DBCompactionStyle, Options, DB}; - use std::sync::Arc; - use std::thread; - - let path = "_rust_rocksdb_partial_merge_test"; - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.set_compaction_style(DBCompactionStyle::Universal); - opts.set_min_write_buffer_number_to_merge(10); - - opts.set_merge_operator( - "sort operator", - test_counting_full_merge, - Some(test_counting_partial_merge), - ); - { - let db = Arc::new(DB::open(&opts, path).unwrap()); - let _ = db.delete(b"k1"); - let _ = db.delete(b"k2"); - let _ = db.merge(b"k1", b"a"); - let _ = db.merge(b"k1", b"b"); - let _ = db.merge(b"k1", b"d"); - let _ = db.merge(b"k1", b"a"); - let _ = db.merge(b"k1", b"a"); - let _ = db.merge(b"k1", b"efg"); - for i in 0..500 { - let _ = db.merge(b"k2", b"c"); - if i % 20 == 0 { - let _ = db.get(b"k2"); - } - } - for i in 0..500 { - let _ = db.merge(b"k2", b"c"); - if i % 20 == 0 { - let _ = db.get(b"k2"); - } - } - db.compact_range(None::<&[u8]>, None::<&[u8]>); - let d1 = db.clone(); - let d2 = db.clone(); - let d3 = db.clone(); - - let h1 = thread::spawn(move || { - for i in 0..500 { - let _ = d1.merge(b"k2", b"c"); - if i % 20 == 0 { - let _ = d1.get(b"k2"); - } - } - for i in 0..500 { - let _ = d1.merge(b"k2", b"a"); - if i % 20 == 0 { - let _ = d1.get(b"k2"); - } - } - }); - let h2 = thread::spawn(move || { - for i in 0..500 { - let _ = d2.merge(b"k2", b"b"); - if i % 20 == 0 { - let _ = d2.get(b"k2"); - } - } - for i in 0..500 { - let _ = d2.merge(b"k2", b"d"); - if i % 20 == 0 { - let _ = d2.get(b"k2"); - } - } - d2.compact_range(None::<&[u8]>, None::<&[u8]>); - }); - h2.join().unwrap(); - let h3 = thread::spawn(move || { - for i in 0..500 { - let _ = d3.merge(b"k2", b"a"); - if i % 20 == 0 { - let _ = d3.get(b"k2"); - } - } - for i in 0..500 { - let _ = d3.merge(b"k2", b"c"); - if i % 20 == 0 { - let _ = d3.get(b"k2"); - } - } - }); - let m = db.merge(b"k1", b"b"); - assert!(m.is_ok()); - h3.join().unwrap(); - h1.join().unwrap(); - match db.get(b"k2") { - Ok(Some(value)) => match from_slice::(&*value) { - Some(v) => unsafe { - assert_eq!(v.num_a, 1000); - assert_eq!(v.num_b, 500); - assert_eq!(v.num_c, 2000); - assert_eq!(v.num_d, 500); - }, - None => panic!("Failed to get ValueCounts from db"), - }, - Err(e) => panic!("error reading value {:?}", e), - _ => panic!("value not present"), - } - match db.get(b"k1") { - Ok(Some(value)) => match from_slice::(&*value) { - Some(v) => unsafe { - assert_eq!(v.num_a, 3); - assert_eq!(v.num_b, 2); - assert_eq!(v.num_c, 0); - assert_eq!(v.num_d, 1); - }, - None => panic!("Failed to get ValueCounts from db"), - }, - Err(e) => panic!("error reading value {:?}", e), - _ => panic!("value not present"), - } - } - assert!(DB::destroy(&opts, path).is_ok()); - } -} diff --git a/tests/test_merge_operator.rs b/tests/test_merge_operator.rs new file mode 100644 index 0000000..de544da --- /dev/null +++ b/tests/test_merge_operator.rs @@ -0,0 +1,290 @@ +// Copyright 2020 Tyler Neely +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod util; + +use rocksdb::{DBCompactionStyle, MergeOperands, Options, DB}; +use util::DBPath; + +fn test_provided_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, +) -> Option> { + let nops = operands.size_hint().0; + let mut result: Vec = Vec::with_capacity(nops); + if let Some(v) = existing_val { + for e in v { + result.push(*e); + } + } + for op in operands { + for e in op { + result.push(*e); + } + } + Some(result) +} + +#[test] +fn merge_test() { + use crate::{Options, DB}; + + let db_path = DBPath::new("_rust_rocksdb_merge_test"); + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_merge_operator("test operator", test_provided_merge, None); + + let db = DB::open(&opts, &db_path).unwrap(); + let p = db.put(b"k1", b"a"); + assert!(p.is_ok()); + let _ = db.merge(b"k1", b"b"); + let _ = db.merge(b"k1", b"c"); + let _ = db.merge(b"k1", b"d"); + let _ = db.merge(b"k1", b"efg"); + let m = db.merge(b"k1", b"h"); + assert!(m.is_ok()); + match db.get(b"k1") { + Ok(Some(value)) => { + if let Ok(v) = std::str::from_utf8(&value) { + println!("retrieved utf8 value: {}", v) + } else { + println!("did not read valid utf-8 out of the db") + } + } + Err(_) => println!("error reading value"), + _ => panic!("value not present"), + } + + assert!(m.is_ok()); + let r = db.get(b"k1"); + assert_eq!(r.unwrap().unwrap(), b"abcdefgh"); + assert!(db.delete(b"k1").is_ok()); + assert!(db.get(b"k1").unwrap().is_none()); +} + +unsafe fn to_slice(p: &T) -> &[u8] { + ::std::slice::from_raw_parts((p as *const T) as *const u8, ::std::mem::size_of::()) +} + +fn from_slice(s: &[u8]) -> Option<&T> { + if std::mem::size_of::() == s.len() { + unsafe { Some(&*(s.as_ptr() as *const T)) } + } else { + println!( + "slice {:?} is len {}, but T is size {}", + s, + s.len(), + std::mem::size_of::() + ); + None + } +} + +#[repr(packed)] +#[derive(Copy, Clone, Debug, Default)] +struct ValueCounts { + num_a: u32, + num_b: u32, + num_c: u32, + num_d: u32, +} + +fn test_counting_partial_merge( + _new_key: &[u8], + _existing_val: Option<&[u8]>, + operands: &mut MergeOperands, +) -> Option> { + let nops = operands.size_hint().0; + let mut result: Vec = Vec::with_capacity(nops); + for op in operands { + for e in op { + result.push(*e); + } + } + Some(result) +} + +fn test_counting_full_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, +) -> Option> { + let mut counts = if let Some(v) = existing_val { + *from_slice::(v).unwrap_or(&ValueCounts::default()) + } else { + ValueCounts::default() + }; + + for op in operands { + for e in op { + match *e { + b'a' => counts.num_a += 1, + b'b' => counts.num_b += 1, + b'c' => counts.num_c += 1, + b'd' => counts.num_d += 1, + _ => {} + } + } + } + let slc = unsafe { to_slice(&counts) }; + Some(slc.to_vec()) +} + +#[test] +#[allow(clippy::too_many_lines)] +fn counting_merge_test() { + use std::sync::Arc; + use std::thread; + + let db_path = DBPath::new("_rust_rocksdb_partial_merge_test"); + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_compaction_style(DBCompactionStyle::Universal); + opts.set_min_write_buffer_number_to_merge(10); + + opts.set_merge_operator( + "sort operator", + test_counting_full_merge, + Some(test_counting_partial_merge), + ); + + let db = Arc::new(DB::open(&opts, &db_path).unwrap()); + let _ = db.delete(b"k1"); + let _ = db.delete(b"k2"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"b"); + let _ = db.merge(b"k1", b"d"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"a"); + let _ = db.merge(b"k1", b"efg"); + for i in 0..500 { + let _ = db.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = db.get(b"k2"); + } + } + for i in 0..500 { + let _ = db.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = db.get(b"k2"); + } + } + db.compact_range(None::<&[u8]>, None::<&[u8]>); + let d1 = db.clone(); + let d2 = db.clone(); + let d3 = db.clone(); + + let h1 = thread::spawn(move || { + for i in 0..500 { + let _ = d1.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = d1.get(b"k2"); + } + } + for i in 0..500 { + let _ = d1.merge(b"k2", b"a"); + if i % 20 == 0 { + let _ = d1.get(b"k2"); + } + } + }); + let h2 = thread::spawn(move || { + for i in 0..500 { + let _ = d2.merge(b"k2", b"b"); + if i % 20 == 0 { + let _ = d2.get(b"k2"); + } + } + for i in 0..500 { + let _ = d2.merge(b"k2", b"d"); + if i % 20 == 0 { + let _ = d2.get(b"k2"); + } + } + d2.compact_range(None::<&[u8]>, None::<&[u8]>); + }); + h2.join().unwrap(); + let h3 = thread::spawn(move || { + for i in 0..500 { + let _ = d3.merge(b"k2", b"a"); + if i % 20 == 0 { + let _ = d3.get(b"k2"); + } + } + for i in 0..500 { + let _ = d3.merge(b"k2", b"c"); + if i % 20 == 0 { + let _ = d3.get(b"k2"); + } + } + }); + let m = db.merge(b"k1", b"b"); + assert!(m.is_ok()); + h3.join().unwrap(); + h1.join().unwrap(); + match db.get(b"k2") { + Ok(Some(value)) => match from_slice::(&*value) { + Some(v) => unsafe { + assert_eq!(v.num_a, 1000); + assert_eq!(v.num_b, 500); + assert_eq!(v.num_c, 2000); + assert_eq!(v.num_d, 500); + }, + None => panic!("Failed to get ValueCounts from db"), + }, + Err(e) => panic!("error reading value {:?}", e), + _ => panic!("value not present"), + } + match db.get(b"k1") { + Ok(Some(value)) => match from_slice::(&*value) { + Some(v) => unsafe { + assert_eq!(v.num_a, 3); + assert_eq!(v.num_b, 2); + assert_eq!(v.num_c, 0); + assert_eq!(v.num_d, 1); + }, + None => panic!("Failed to get ValueCounts from db"), + }, + Err(e) => panic!("error reading value {:?}", e), + _ => panic!("value not present"), + } +} + +#[test] +fn failed_merge_test() { + fn test_failing_merge( + _key: &[u8], + _val: Option<&[u8]>, + _operands: &mut MergeOperands, + ) -> Option> { + None + } + use crate::{Options, DB}; + + let db_path = DBPath::new("_rust_rocksdb_failed_merge_test"); + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_merge_operator("test operator", test_failing_merge, None); + + let db = DB::open(&opts, &db_path).expect("open with a merge operator"); + db.put(b"key", b"value").expect("put_ok"); + let res = db.merge(b"key", b"new value"); + match res.and_then(|_e| db.get(b"key")) { + Ok(val) => panic!("expected merge failure to propagate, got: {:?}", val), + Err(e) => { + assert!(e.into_string().contains("Could not perform merge.")); + } + } +}