Add support for SstFileWriter and DB::ingest_external_file (#421)

master
Lucjan Suski 5 years ago committed by GitHub
parent c70b59de55
commit b5d69fde69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 90
      src/db.rs
  2. 95
      src/db_options.rs
  3. 13
      src/lib.rs
  4. 167
      src/sst_file_writer.rs
  5. 51
      tests/test_sst_file_writer.rs

@ -17,8 +17,8 @@ use crate::{
ffi,
ffi_util::{opt_bytes_to_ptr, to_cpath},
ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBPinnableSlice, DBRawIterator,
DBWALIterator, Direction, Error, FlushOptions, IteratorMode, Options, ReadOptions, Snapshot,
WriteBatch, WriteOptions,
DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode,
Options, ReadOptions, Snapshot, WriteBatch, WriteOptions,
};
use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
@ -1134,6 +1134,92 @@ impl DB {
}
Ok(())
}
/// Loads a list of external SST files created with SstFileWriter into the DB with default opts
pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
let opts = IngestExternalFileOptions::default();
self.ingest_external_file_opts(&opts, paths)
}
/// Loads a list of external SST files created with SstFileWriter into the DB
pub fn ingest_external_file_opts<P: AsRef<Path>>(
&self,
opts: &IngestExternalFileOptions,
paths: Vec<P>,
) -> Result<(), Error> {
let paths_v: Vec<CString> = paths
.iter()
.map(|path| to_cpath(&path))
.collect::<Result<Vec<_>, _>>()?;
let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
self.ingest_external_file_raw(&opts, &paths_v, &cpaths)
}
/// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
/// with default opts
pub fn ingest_external_file_cf<P: AsRef<Path>>(
&self,
cf: &ColumnFamily,
paths: Vec<P>,
) -> Result<(), Error> {
let opts = IngestExternalFileOptions::default();
self.ingest_external_file_cf_opts(&cf, &opts, paths)
}
/// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
&self,
cf: &ColumnFamily,
opts: &IngestExternalFileOptions,
paths: Vec<P>,
) -> Result<(), Error> {
let paths_v: Vec<CString> = paths
.iter()
.map(|path| to_cpath(&path))
.collect::<Result<Vec<_>, _>>()?;
let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
self.ingest_external_file_raw_cf(&cf, &opts, &paths_v, &cpaths)
}
fn ingest_external_file_raw(
&self,
opts: &IngestExternalFileOptions,
paths_v: &[CString],
cpaths: &[*const c_char],
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_ingest_external_file(
self.inner,
cpaths.as_ptr(),
paths_v.len(),
opts.inner as *const _
));
Ok(())
}
}
fn ingest_external_file_raw_cf(
&self,
cf: &ColumnFamily,
opts: &IngestExternalFileOptions,
paths_v: &[CString],
cpaths: &[*const c_char],
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_ingest_external_file_cf(
self.inner,
cf.inner,
cpaths.as_ptr(),
paths_v.len(),
opts.inner as *const _
));
Ok(())
}
}
}
impl Drop for DB {

@ -134,6 +134,31 @@ pub struct ReadOptions {
iterate_upper_bound: Option<Vec<u8>>,
}
/// For configuring external files ingestion.
///
/// # Examples
///
/// Move files instead of copying them:
///
/// ```
/// use rocksdb::{DB, IngestExternalFileOptions, SstFileWriter, Options};
///
/// let writer_opts = Options::default();
/// let mut writer = SstFileWriter::create(&writer_opts);
/// writer.open("_path_for_sst_file").unwrap();
/// writer.put(b"k1", b"v1").unwrap();
/// writer.finish().unwrap();
///
/// let path = "_path_for_rocksdb_storageY";
/// let db = DB::open_default(&path).unwrap();
/// let mut ingest_opts = IngestExternalFileOptions::default();
/// ingest_opts.set_move_files(true);
/// db.ingest_external_file_opts(&ingest_opts, vec!["_path_for_sst_file"]).unwrap();
/// ```
pub struct IngestExternalFileOptions {
pub(crate) inner: *mut ffi::rocksdb_ingestexternalfileoptions_t,
}
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types.
@ -141,6 +166,7 @@ unsafe impl Send for Options {}
unsafe impl Send for WriteOptions {}
unsafe impl Send for BlockBasedOptions {}
unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
@ -148,6 +174,7 @@ unsafe impl Sync for Options {}
unsafe impl Sync for WriteOptions {}
unsafe impl Sync for BlockBasedOptions {}
unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {}
impl Drop for Options {
fn drop(&mut self) {
@ -187,6 +214,12 @@ impl Drop for ReadOptions {
}
}
impl Drop for IngestExternalFileOptions {
fn drop(&mut self) {
unsafe { ffi::rocksdb_ingestexternalfileoptions_destroy(self.inner) }
}
}
impl BlockBasedOptions {
/// Approximate size of user data packed per block. Note that the
/// block size specified here corresponds to uncompressed data. The
@ -1873,6 +1906,68 @@ impl Default for ReadOptions {
}
}
impl IngestExternalFileOptions {
/// Can be set to true to move the files instead of copying them.
pub fn set_move_files(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_move_files(self.inner, v as c_uchar);
}
}
/// If set to false, an ingested file keys could appear in existing snapshots
/// that where created before the file was ingested.
pub fn set_snapshot_consistency(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_snapshot_consistency(
self.inner,
v as c_uchar,
);
}
}
/// If set to false, IngestExternalFile() will fail if the file key range
/// overlaps with existing keys or tombstones in the DB.
pub fn set_allow_global_seqno(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_allow_global_seqno(self.inner, v as c_uchar);
}
}
/// If set to false and the file key range overlaps with the memtable key range
/// (memtable flush required), IngestExternalFile will fail.
pub fn set_allow_blocking_flush(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_allow_blocking_flush(
self.inner,
v as c_uchar,
);
}
}
/// Set to true if you would like duplicate keys in the file being ingested
/// to be skipped rather than overwriting existing data under that key.
/// Usecase: back-fill of some historical data in the database without
/// over-writing existing newer version of data.
/// This option could only be used if the DB has been running
/// with allow_ingest_behind=true since the dawn of time.
/// All files will be ingested at the bottommost level with seqno=0.
pub fn set_ingest_behind(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_ingest_behind(self.inner, v as c_uchar);
}
}
}
impl Default for IngestExternalFileOptions {
fn default() -> IngestExternalFileOptions {
unsafe {
IngestExternalFileOptions {
inner: ffi::rocksdb_ingestexternalfileoptions_create(),
}
}
}
}
/// Used by BlockBasedOptions::set_index_type.
pub enum BlockBasedIndexType {
/// A space efficient index block that is optimized for

@ -86,6 +86,7 @@ mod db_pinnable_slice;
pub mod merge_operator;
mod slice_transform;
mod snapshot;
mod sst_file_writer;
mod write_batch;
pub use crate::{
@ -95,13 +96,14 @@ pub use crate::{
db_iterator::{DBIterator, DBRawIterator, DBWALIterator, Direction, IteratorMode},
db_options::{
BlockBasedIndexType, BlockBasedOptions, DBCompactionStyle, DBCompressionType,
DBRecoveryMode, DataBlockIndexType, FlushOptions, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, WriteOptions,
DBRecoveryMode, DataBlockIndexType, FlushOptions, IngestExternalFileOptions,
MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
merge_operator::MergeOperands,
slice_transform::SliceTransform,
snapshot::Snapshot,
sst_file_writer::SstFileWriter,
write_batch::{WriteBatch, WriteBatchIterator},
};
@ -155,7 +157,8 @@ impl fmt::Display for Error {
mod test {
use super::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator,
Options, PlainTableFactoryOptions, ReadOptions, Snapshot, WriteOptions, DB,
IngestExternalFileOptions, Options, PlainTableFactoryOptions, ReadOptions, Snapshot,
SstFileWriter, WriteOptions, DB,
};
#[test]
@ -174,10 +177,12 @@ mod test {
is_send::<Options>();
is_send::<ReadOptions>();
is_send::<WriteOptions>();
is_send::<IngestExternalFileOptions>();
is_send::<BlockBasedOptions>();
is_send::<PlainTableFactoryOptions>();
is_send::<ColumnFamilyDescriptor>();
is_send::<ColumnFamily>();
is_send::<SstFileWriter>();
}
#[test]
@ -193,8 +198,10 @@ mod test {
is_sync::<Options>();
is_sync::<ReadOptions>();
is_sync::<WriteOptions>();
is_sync::<IngestExternalFileOptions>();
is_sync::<BlockBasedOptions>();
is_sync::<PlainTableFactoryOptions>();
is_sync::<ColumnFamilyDescriptor>();
is_sync::<SstFileWriter>();
}
}

@ -0,0 +1,167 @@
// Copyright 2020 Lucjan Suski
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//`
use crate::{ffi, ffi_util::to_cpath, Error, Options};
use libc::{self, c_char, size_t};
use std::{ffi::CString, marker::PhantomData, path::Path};
/// SstFileWriter is used to create sst files that can be added to database later
/// All keys in files generated by SstFileWriter will have sequence number = 0.
pub struct SstFileWriter<'a> {
pub(crate) inner: *mut ffi::rocksdb_sstfilewriter_t,
// Options are needed to be alive when calling open(),
// so let's make sure it doesn't get, dropped for the lifetime of SstFileWriter
phantom: PhantomData<&'a Options>,
}
unsafe impl<'a> Send for SstFileWriter<'a> {}
unsafe impl<'a> Sync for SstFileWriter<'a> {}
struct EnvOptions {
inner: *mut ffi::rocksdb_envoptions_t,
}
impl Drop for EnvOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_envoptions_destroy(self.inner);
}
}
}
impl Default for EnvOptions {
fn default() -> EnvOptions {
let opts = unsafe { ffi::rocksdb_envoptions_create() };
EnvOptions { inner: opts }
}
}
impl<'a> SstFileWriter<'a> {
/// Initializes SstFileWriter with given DB options.
pub fn create(opts: &'a Options) -> SstFileWriter {
let env_options = EnvOptions::default();
let writer = SstFileWriter::create_raw(opts, &env_options);
SstFileWriter {
inner: writer,
phantom: PhantomData,
}
}
fn create_raw(opts: &Options, env_opts: &EnvOptions) -> *mut ffi::rocksdb_sstfilewriter_t {
unsafe { ffi::rocksdb_sstfilewriter_create(env_opts.inner, opts.inner) }
}
/// Prepare SstFileWriter to write into file located at "file_path".
pub fn open<P: AsRef<Path>>(&'a self, path: P) -> Result<(), Error> {
let cpath = to_cpath(&path)?;
self.open_raw(&cpath)
}
fn open_raw(&'a self, cpath: &CString) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_sstfilewriter_open(
self.inner,
cpath.as_ptr() as *const _
));
Ok(())
}
}
/// Finalize writing to sst file and close file.
pub fn finish(&mut self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_sstfilewriter_finish(self.inner,));
Ok(())
}
}
/// returns the current file size
pub fn file_size(&self) -> u64 {
let mut file_size: u64 = 0;
unsafe { ffi::rocksdb_sstfilewriter_file_size(self.inner, &mut file_size) };
file_size
}
/// Adds a Put key with value to currently opened file
/// REQUIRES: key is after any previously added key according to comparator.
pub fn put<K, V>(&mut self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_sstfilewriter_put(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
/// Adds a Merge key with value to currently opened file
/// REQUIRES: key is after any previously added key according to comparator.
pub fn merge<K, V>(&mut self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_sstfilewriter_merge(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
/// Adds a deletion key to currently opened file
/// REQUIRES: key is after any previously added key according to comparator.
pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) -> Result<(), Error> {
let key = key.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_sstfilewriter_delete(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
));
Ok(())
}
}
}
impl<'a> Drop for SstFileWriter<'a> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_sstfilewriter_destroy(self.inner);
}
}
}

@ -0,0 +1,51 @@
// Copyright 2020 Lucjan Suski
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod util;
use rocksdb::{Error, Options, SstFileWriter, DB};
use util::DBPath;
#[test]
fn sst_file_writer_works() {
let db_path = DBPath::new("_rust_rocksdb_sstfilewritertest");
let dir = tempfile::Builder::new()
.prefix("_rust_rocksdb_sstfilewritertest")
.tempdir()
.expect("Failed to create temporary path for file writer.");
let writer_path = dir.path().join("filewriter");
{
let opts = Options::default();
let mut writer = SstFileWriter::create(&opts);
writer.open(&writer_path).unwrap();
writer.put(b"k1", b"v1").unwrap();
writer.put(b"k2", b"v2").unwrap();
writer.delete(b"k3").unwrap();
writer.finish().unwrap();
assert!(writer.file_size() > 0);
}
{
let db = DB::open_default(&db_path).unwrap();
db.put(b"k3", b"v3").unwrap();
db.ingest_external_file(vec![&writer_path]).unwrap();
let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert_eq!(r.unwrap().unwrap(), b"v1");
let r: Result<Option<Vec<u8>>, Error> = db.get(b"k2");
assert_eq!(r.unwrap().unwrap(), b"v2");
assert!(db.get(b"k3").unwrap().is_none());
}
}
Loading…
Cancel
Save