// Copyright 2020 Tyler Neely // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. mod util; use pretty_assertions::assert_eq; use rocksdb::{merge_operator::MergeFn, DBCompactionStyle, MergeOperands, Options, DB}; use serde::{Deserialize, Serialize}; use util::DBPath; fn test_provided_merge( _new_key: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands, ) -> Option> { let nops = operands.len(); let mut result: Vec = Vec::with_capacity(nops); if let Some(v) = existing_val { for e in v { result.push(*e); } } for op in operands { for e in op { result.push(*e); } } Some(result) } #[test] fn merge_test() { use crate::{Options, DB}; let db_path = DBPath::new("_rust_rocksdb_merge_test"); let mut opts = Options::default(); opts.create_if_missing(true); opts.set_merge_operator_associative("test operator", test_provided_merge); let db = DB::open(&opts, &db_path).unwrap(); let p = db.put(b"k1", b"a"); assert!(p.is_ok()); let _ = db.merge(b"k1", b"b"); let _ = db.merge(b"k1", b"c"); let _ = db.merge(b"k1", b"d"); let _ = db.merge(b"k1", b"efg"); let m = db.merge(b"k1", b"h"); assert!(m.is_ok()); match db.get(b"k1") { Ok(Some(value)) => { if let Ok(v) = std::str::from_utf8(&value) { println!("retrieved utf8 value: {v}") } else { println!("did not read valid utf-8 out of the db") } } Err(_) => println!("error reading value"), _ => panic!("value not present"), } assert!(m.is_ok()); let r = db.get(b"k1"); assert_eq!(r.unwrap().unwrap(), b"abcdefgh"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } #[derive(Serialize, Deserialize, Copy, Clone, Debug, Default)] struct ValueCounts { num_a: u32, num_b: u32, num_c: u32, num_d: u32, } impl ValueCounts { fn from_slice(slice: &[u8]) -> Option { bincode::deserialize::(slice).ok() } fn as_bytes(&self) -> Option> { bincode::serialize(self).ok() } } fn test_counting_partial_merge( _new_key: &[u8], _existing_val: Option<&[u8]>, operands: &MergeOperands, ) -> Option> { let nops = operands.len(); let mut result: Vec = Vec::with_capacity(nops); for op in operands { for e in op { result.push(*e); } } Some(result) } fn test_counting_full_merge( _new_key: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands, ) -> Option> { let mut counts = existing_val .and_then(ValueCounts::from_slice) .unwrap_or_default(); for op in operands { for e in op { match *e { b'a' => counts.num_a += 1, b'b' => counts.num_b += 1, b'c' => counts.num_c += 1, b'd' => counts.num_d += 1, _ => {} } } } counts.as_bytes() } #[test] fn counting_merge_test() { use std::{sync::Arc, thread}; let db_path = DBPath::new("_rust_rocksdb_partial_merge_test"); let mut opts = Options::default(); opts.create_if_missing(true); opts.set_compaction_style(DBCompactionStyle::Universal); opts.set_min_write_buffer_number_to_merge(10); opts.set_merge_operator( "sort operator", test_counting_full_merge, test_counting_partial_merge, ); let db = Arc::new(DB::open(&opts, &db_path).unwrap()); let _ = db.delete(b"k1"); let _ = db.delete(b"k2"); let _ = db.merge(b"k1", b"a"); let _ = db.merge(b"k1", b"b"); let _ = db.merge(b"k1", b"d"); let _ = db.merge(b"k1", b"a"); let _ = db.merge(b"k1", b"a"); let _ = db.merge(b"k1", b"efg"); for i in 0..500 { let _ = db.merge(b"k2", b"c"); if i % 20 == 0 { let _ = db.get(b"k2"); } } for i in 0..500 { let _ = db.merge(b"k2", b"c"); if i % 20 == 0 { let _ = db.get(b"k2"); } } db.compact_range(None::<&[u8]>, None::<&[u8]>); let d1 = db.clone(); let d2 = db.clone(); let d3 = db.clone(); let h1 = thread::spawn(move || { for i in 0..500 { let _ = d1.merge(b"k2", b"c"); if i % 20 == 0 { let _ = d1.get(b"k2"); } } for i in 0..500 { let _ = d1.merge(b"k2", b"a"); if i % 20 == 0 { let _ = d1.get(b"k2"); } } }); let h2 = thread::spawn(move || { for i in 0..500 { let _ = d2.merge(b"k2", b"b"); if i % 20 == 0 { let _ = d2.get(b"k2"); } } for i in 0..500 { let _ = d2.merge(b"k2", b"d"); if i % 20 == 0 { let _ = d2.get(b"k2"); } } d2.compact_range(None::<&[u8]>, None::<&[u8]>); }); h2.join().unwrap(); let h3 = thread::spawn(move || { for i in 0..500 { let _ = d3.merge(b"k2", b"a"); if i % 20 == 0 { let _ = d3.get(b"k2"); } } for i in 0..500 { let _ = d3.merge(b"k2", b"c"); if i % 20 == 0 { let _ = d3.get(b"k2"); } } }); let m = db.merge(b"k1", b"b"); assert!(m.is_ok()); h3.join().unwrap(); h1.join().unwrap(); let value_getter = |key| match db.get(key) { Ok(Some(value)) => ValueCounts::from_slice(&value) .map_or_else(|| panic!("unable to create ValueCounts from bytes"), |v| v), Ok(None) => panic!("value not present"), Err(e) => panic!("error reading value {:?}", e), }; let counts = value_getter(b"k2"); assert_eq!(counts.num_a, 1000); assert_eq!(counts.num_b, 500); assert_eq!(counts.num_c, 2000); assert_eq!(counts.num_d, 500); let counts = value_getter(b"k1"); assert_eq!(counts.num_a, 3); assert_eq!(counts.num_b, 2); assert_eq!(counts.num_c, 0); assert_eq!(counts.num_d, 1); } #[test] fn failed_merge_test() { fn test_failing_merge( _key: &[u8], _val: Option<&[u8]>, _operands: &MergeOperands, ) -> Option> { None } use crate::{Options, DB}; let db_path = DBPath::new("_rust_rocksdb_failed_merge_test"); let mut opts = Options::default(); opts.create_if_missing(true); opts.set_merge_operator_associative("test operator", test_failing_merge); let db = DB::open(&opts, &db_path).expect("open with a merge operator"); db.put(b"key", b"value").expect("put_ok"); let res = db.merge(b"key", b"new value"); match res.and_then(|_e| db.get(b"key")) { Ok(val) => panic!("expected merge failure to propagate, got: {:?}", val), Err(e) => { assert!(e.into_string().contains("Could not perform merge.")); } } } fn make_merge_max_with_limit(limit: u64) -> impl MergeFn + Clone { move |_key: &[u8], first: Option<&[u8]>, rest: &MergeOperands| { let max = first .into_iter() .chain(rest) .map(|slice| { let mut bytes: [u8; 8] = Default::default(); bytes.clone_from_slice(slice); u64::from_ne_bytes(bytes) }) .fold(0, u64::max); let new_value = max.min(limit); Some(Vec::from(new_value.to_ne_bytes().as_ref())) } } #[test] fn test_merge_state() { use {Options, DB}; let path = "_rust_rocksdb_merge_test_state"; let mut opts = Options::default(); opts.create_if_missing(true); opts.set_merge_operator_associative("max-limit-12", make_merge_max_with_limit(12)); { let db = DB::open(&opts, path).unwrap(); let p = db.put(b"k1", 1u64.to_ne_bytes()); assert!(p.is_ok()); let _ = db.merge(b"k1", 7u64.to_ne_bytes()); let m = db.merge(b"k1", 64u64.to_ne_bytes()); assert!(m.is_ok()); match db.get(b"k1") { Ok(Some(value)) => { let mut bytes: [u8; 8] = Default::default(); bytes.copy_from_slice(&value); assert_eq!(u64::from_ne_bytes(bytes), 12); } Err(_) => println!("error reading value"), _ => panic!("value not present"), } assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } assert!(DB::destroy(&opts, path).is_ok()); opts.set_merge_operator_associative("max-limit-128", make_merge_max_with_limit(128)); { let db = DB::open(&opts, path).unwrap(); let p = db.put(b"k1", 1u64.to_ne_bytes()); assert!(p.is_ok()); let _ = db.merge(b"k1", 7u64.to_ne_bytes()); let m = db.merge(b"k1", 64u64.to_ne_bytes()); assert!(m.is_ok()); match db.get(b"k1") { Ok(Some(value)) => { let mut bytes: [u8; 8] = Default::default(); bytes.copy_from_slice(&value); assert_eq!(u64::from_ne_bytes(bytes), 64); } Err(_) => println!("error reading value"), _ => panic!("value not present"), } assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } assert!(DB::destroy(&opts, path).is_ok()); }