diff --git a/src/db.rs b/src/db.rs index 1b69521..c76df63 100644 --- a/src/db.rs +++ b/src/db.rs @@ -17,8 +17,8 @@ use crate::{ ffi, ffi_util::{opt_bytes_to_ptr, to_cpath}, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBPinnableSlice, DBRawIterator, - DBWALIterator, Direction, Error, FlushOptions, IteratorMode, Options, ReadOptions, Snapshot, - WriteBatch, WriteOptions, + DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode, + Options, ReadOptions, Snapshot, WriteBatch, WriteOptions, }; use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; @@ -1134,6 +1134,92 @@ impl DB { } Ok(()) } + + /// Loads a list of external SST files created with SstFileWriter into the DB with default opts + pub fn ingest_external_file>(&self, paths: Vec

) -> Result<(), Error> { + let opts = IngestExternalFileOptions::default(); + self.ingest_external_file_opts(&opts, paths) + } + + /// Loads a list of external SST files created with SstFileWriter into the DB + pub fn ingest_external_file_opts>( + &self, + opts: &IngestExternalFileOptions, + paths: Vec

, + ) -> Result<(), Error> { + let paths_v: Vec = paths + .iter() + .map(|path| to_cpath(&path)) + .collect::, _>>()?; + + let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect(); + + self.ingest_external_file_raw(&opts, &paths_v, &cpaths) + } + + /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family + /// with default opts + pub fn ingest_external_file_cf>( + &self, + cf: &ColumnFamily, + paths: Vec

, + ) -> Result<(), Error> { + let opts = IngestExternalFileOptions::default(); + self.ingest_external_file_cf_opts(&cf, &opts, paths) + } + + /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family + pub fn ingest_external_file_cf_opts>( + &self, + cf: &ColumnFamily, + opts: &IngestExternalFileOptions, + paths: Vec

, + ) -> Result<(), Error> { + let paths_v: Vec = paths + .iter() + .map(|path| to_cpath(&path)) + .collect::, _>>()?; + + let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect(); + + self.ingest_external_file_raw_cf(&cf, &opts, &paths_v, &cpaths) + } + + fn ingest_external_file_raw( + &self, + opts: &IngestExternalFileOptions, + paths_v: &[CString], + cpaths: &[*const c_char], + ) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_ingest_external_file( + self.inner, + cpaths.as_ptr(), + paths_v.len(), + opts.inner as *const _ + )); + Ok(()) + } + } + + fn ingest_external_file_raw_cf( + &self, + cf: &ColumnFamily, + opts: &IngestExternalFileOptions, + paths_v: &[CString], + cpaths: &[*const c_char], + ) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_ingest_external_file_cf( + self.inner, + cf.inner, + cpaths.as_ptr(), + paths_v.len(), + opts.inner as *const _ + )); + Ok(()) + } + } } impl Drop for DB { diff --git a/src/db_options.rs b/src/db_options.rs index d6890d9..9a870ae 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -134,6 +134,31 @@ pub struct ReadOptions { iterate_upper_bound: Option>, } +/// For configuring external files ingestion. +/// +/// # Examples +/// +/// Move files instead of copying them: +/// +/// ``` +/// use rocksdb::{DB, IngestExternalFileOptions, SstFileWriter, Options}; +/// +/// let writer_opts = Options::default(); +/// let mut writer = SstFileWriter::create(&writer_opts); +/// writer.open("_path_for_sst_file").unwrap(); +/// writer.put(b"k1", b"v1").unwrap(); +/// writer.finish().unwrap(); +/// +/// let path = "_path_for_rocksdb_storageY"; +/// let db = DB::open_default(&path).unwrap(); +/// let mut ingest_opts = IngestExternalFileOptions::default(); +/// ingest_opts.set_move_files(true); +/// db.ingest_external_file_opts(&ingest_opts, vec!["_path_for_sst_file"]).unwrap(); +/// ``` +pub struct IngestExternalFileOptions { + pub(crate) inner: *mut ffi::rocksdb_ingestexternalfileoptions_t, +} + // Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI // pointer. In most cases, however, this pointer is Send-safe because it is never aliased and // rocksdb internally does not rely on thread-local information for its user-exposed types. @@ -141,6 +166,7 @@ unsafe impl Send for Options {} unsafe impl Send for WriteOptions {} unsafe impl Send for BlockBasedOptions {} unsafe impl Send for ReadOptions {} +unsafe impl Send for IngestExternalFileOptions {} // Sync is similarly safe for many types because they do not expose interior mutability, and their // use within the rocksdb library is generally behind a const reference @@ -148,6 +174,7 @@ unsafe impl Sync for Options {} unsafe impl Sync for WriteOptions {} unsafe impl Sync for BlockBasedOptions {} unsafe impl Sync for ReadOptions {} +unsafe impl Sync for IngestExternalFileOptions {} impl Drop for Options { fn drop(&mut self) { @@ -187,6 +214,12 @@ impl Drop for ReadOptions { } } +impl Drop for IngestExternalFileOptions { + fn drop(&mut self) { + unsafe { ffi::rocksdb_ingestexternalfileoptions_destroy(self.inner) } + } +} + impl BlockBasedOptions { /// Approximate size of user data packed per block. Note that the /// block size specified here corresponds to uncompressed data. The @@ -1873,6 +1906,68 @@ impl Default for ReadOptions { } } +impl IngestExternalFileOptions { + /// Can be set to true to move the files instead of copying them. + pub fn set_move_files(&mut self, v: bool) { + unsafe { + ffi::rocksdb_ingestexternalfileoptions_set_move_files(self.inner, v as c_uchar); + } + } + + /// If set to false, an ingested file keys could appear in existing snapshots + /// that where created before the file was ingested. + pub fn set_snapshot_consistency(&mut self, v: bool) { + unsafe { + ffi::rocksdb_ingestexternalfileoptions_set_snapshot_consistency( + self.inner, + v as c_uchar, + ); + } + } + + /// If set to false, IngestExternalFile() will fail if the file key range + /// overlaps with existing keys or tombstones in the DB. + pub fn set_allow_global_seqno(&mut self, v: bool) { + unsafe { + ffi::rocksdb_ingestexternalfileoptions_set_allow_global_seqno(self.inner, v as c_uchar); + } + } + + /// If set to false and the file key range overlaps with the memtable key range + /// (memtable flush required), IngestExternalFile will fail. + pub fn set_allow_blocking_flush(&mut self, v: bool) { + unsafe { + ffi::rocksdb_ingestexternalfileoptions_set_allow_blocking_flush( + self.inner, + v as c_uchar, + ); + } + } + + /// Set to true if you would like duplicate keys in the file being ingested + /// to be skipped rather than overwriting existing data under that key. + /// Usecase: back-fill of some historical data in the database without + /// over-writing existing newer version of data. + /// This option could only be used if the DB has been running + /// with allow_ingest_behind=true since the dawn of time. + /// All files will be ingested at the bottommost level with seqno=0. + pub fn set_ingest_behind(&mut self, v: bool) { + unsafe { + ffi::rocksdb_ingestexternalfileoptions_set_ingest_behind(self.inner, v as c_uchar); + } + } +} + +impl Default for IngestExternalFileOptions { + fn default() -> IngestExternalFileOptions { + unsafe { + IngestExternalFileOptions { + inner: ffi::rocksdb_ingestexternalfileoptions_create(), + } + } + } +} + /// Used by BlockBasedOptions::set_index_type. pub enum BlockBasedIndexType { /// A space efficient index block that is optimized for diff --git a/src/lib.rs b/src/lib.rs index 279379a..1161997 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -86,6 +86,7 @@ mod db_pinnable_slice; pub mod merge_operator; mod slice_transform; mod snapshot; +mod sst_file_writer; mod write_batch; pub use crate::{ @@ -95,13 +96,14 @@ pub use crate::{ db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode}, db_options::{ BlockBasedIndexType, BlockBasedOptions, DBCompactionStyle, DBCompressionType, - DBRecoveryMode, DataBlockIndexType, FlushOptions, MemtableFactory, Options, - PlainTableFactoryOptions, ReadOptions, WriteOptions, + DBRecoveryMode, DataBlockIndexType, FlushOptions, IngestExternalFileOptions, + MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, WriteOptions, }, db_pinnable_slice::DBPinnableSlice, merge_operator::MergeOperands, slice_transform::SliceTransform, snapshot::Snapshot, + sst_file_writer::SstFileWriter, write_batch::{WriteBatch, WriteBatchIterator}, }; @@ -155,7 +157,8 @@ impl fmt::Display for Error { mod test { use super::{ BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, - Options, PlainTableFactoryOptions, ReadOptions, Snapshot, WriteOptions, DB, + IngestExternalFileOptions, Options, PlainTableFactoryOptions, ReadOptions, Snapshot, + SstFileWriter, WriteOptions, DB, }; #[test] @@ -174,10 +177,12 @@ mod test { is_send::(); is_send::(); is_send::(); + is_send::(); is_send::(); is_send::(); is_send::(); is_send::(); + is_send::(); } #[test] @@ -193,8 +198,10 @@ mod test { is_sync::(); is_sync::(); is_sync::(); + is_sync::(); is_sync::(); is_sync::(); is_sync::(); + is_sync::(); } } diff --git a/src/sst_file_writer.rs b/src/sst_file_writer.rs new file mode 100644 index 0000000..0678c9d --- /dev/null +++ b/src/sst_file_writer.rs @@ -0,0 +1,167 @@ +// Copyright 2020 Lucjan Suski +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//` + +use crate::{ffi, ffi_util::to_cpath, Error, Options}; + +use libc::{self, c_char, size_t}; +use std::{ffi::CString, marker::PhantomData, path::Path}; + +/// SstFileWriter is used to create sst files that can be added to database later +/// All keys in files generated by SstFileWriter will have sequence number = 0. +pub struct SstFileWriter<'a> { + pub(crate) inner: *mut ffi::rocksdb_sstfilewriter_t, + // Options are needed to be alive when calling open(), + // so let's make sure it doesn't get, dropped for the lifetime of SstFileWriter + phantom: PhantomData<&'a Options>, +} + +unsafe impl<'a> Send for SstFileWriter<'a> {} +unsafe impl<'a> Sync for SstFileWriter<'a> {} + +struct EnvOptions { + inner: *mut ffi::rocksdb_envoptions_t, +} + +impl Drop for EnvOptions { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_envoptions_destroy(self.inner); + } + } +} + +impl Default for EnvOptions { + fn default() -> EnvOptions { + let opts = unsafe { ffi::rocksdb_envoptions_create() }; + EnvOptions { inner: opts } + } +} + +impl<'a> SstFileWriter<'a> { + /// Initializes SstFileWriter with given DB options. + pub fn create(opts: &'a Options) -> SstFileWriter { + let env_options = EnvOptions::default(); + + let writer = SstFileWriter::create_raw(opts, &env_options); + + SstFileWriter { + inner: writer, + phantom: PhantomData, + } + } + + fn create_raw(opts: &Options, env_opts: &EnvOptions) -> *mut ffi::rocksdb_sstfilewriter_t { + unsafe { ffi::rocksdb_sstfilewriter_create(env_opts.inner, opts.inner) } + } + + /// Prepare SstFileWriter to write into file located at "file_path". + pub fn open>(&'a self, path: P) -> Result<(), Error> { + let cpath = to_cpath(&path)?; + self.open_raw(&cpath) + } + + fn open_raw(&'a self, cpath: &CString) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_sstfilewriter_open( + self.inner, + cpath.as_ptr() as *const _ + )); + + Ok(()) + } + } + + /// Finalize writing to sst file and close file. + pub fn finish(&mut self) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_sstfilewriter_finish(self.inner,)); + Ok(()) + } + } + + /// returns the current file size + pub fn file_size(&self) -> u64 { + let mut file_size: u64 = 0; + unsafe { ffi::rocksdb_sstfilewriter_file_size(self.inner, &mut file_size) }; + file_size + } + + /// Adds a Put key with value to currently opened file + /// REQUIRES: key is after any previously added key according to comparator. + pub fn put(&mut self, key: K, value: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + + unsafe { + ffi_try!(ffi::rocksdb_sstfilewriter_put( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + )); + Ok(()) + } + } + + /// Adds a Merge key with value to currently opened file + /// REQUIRES: key is after any previously added key according to comparator. + pub fn merge(&mut self, key: K, value: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let key = key.as_ref(); + let value = value.as_ref(); + + unsafe { + ffi_try!(ffi::rocksdb_sstfilewriter_merge( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + value.as_ptr() as *const c_char, + value.len() as size_t, + )); + Ok(()) + } + } + + /// Adds a deletion key to currently opened file + /// REQUIRES: key is after any previously added key according to comparator. + pub fn delete>(&mut self, key: K) -> Result<(), Error> { + let key = key.as_ref(); + + unsafe { + ffi_try!(ffi::rocksdb_sstfilewriter_delete( + self.inner, + key.as_ptr() as *const c_char, + key.len() as size_t, + )); + Ok(()) + } + } +} + +impl<'a> Drop for SstFileWriter<'a> { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_sstfilewriter_destroy(self.inner); + } + } +} diff --git a/tests/test_sst_file_writer.rs b/tests/test_sst_file_writer.rs new file mode 100644 index 0000000..43b0f8b --- /dev/null +++ b/tests/test_sst_file_writer.rs @@ -0,0 +1,51 @@ +// Copyright 2020 Lucjan Suski +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod util; + +use rocksdb::{Error, Options, SstFileWriter, DB}; + +use util::DBPath; + +#[test] +fn sst_file_writer_works() { + let db_path = DBPath::new("_rust_rocksdb_sstfilewritertest"); + let dir = tempfile::Builder::new() + .prefix("_rust_rocksdb_sstfilewritertest") + .tempdir() + .expect("Failed to create temporary path for file writer."); + let writer_path = dir.path().join("filewriter"); + { + let opts = Options::default(); + let mut writer = SstFileWriter::create(&opts); + writer.open(&writer_path).unwrap(); + writer.put(b"k1", b"v1").unwrap(); + + writer.put(b"k2", b"v2").unwrap(); + + writer.delete(b"k3").unwrap(); + writer.finish().unwrap(); + assert!(writer.file_size() > 0); + } + { + let db = DB::open_default(&db_path).unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.ingest_external_file(vec![&writer_path]).unwrap(); + let r: Result>, Error> = db.get(b"k1"); + assert_eq!(r.unwrap().unwrap(), b"v1"); + let r: Result>, Error> = db.get(b"k2"); + assert_eq!(r.unwrap().unwrap(), b"v2"); + assert!(db.get(b"k3").unwrap().is_none()); + } +}