diff --git a/db/write_batch.cc b/db/write_batch.cc index 57ced9480..134cfb63c 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -146,6 +146,13 @@ void WriteBatch::Put(const Slice& key, const Slice& value) { PutLengthPrefixedSlice(&rep_, value); } +void WriteBatch::Put(const SliceParts& key, const SliceParts& value) { + WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); + rep_.push_back(static_cast(kTypeValue)); + PutLengthPrefixedSliceParts(&rep_, key); + PutLengthPrefixedSliceParts(&rep_, value); +} + void WriteBatch::Delete(const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeDeletion)); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index c97201973..4aba21ca4 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -227,6 +227,34 @@ TEST(WriteBatchTest, Continue) { handler.seen); } +TEST(WriteBatchTest, PutGatherSlices) { + WriteBatch batch; + batch.Put(Slice("foo"), Slice("bar")); + + { + // Try a write where the key is one slice but the value is two + Slice key_slice("baz"); + Slice value_slices[2] = { Slice("header"), Slice("payload") }; + batch.Put(SliceParts(&key_slice, 1), + SliceParts(value_slices, 2)); + } + + { + // One where the key is composite but the value is a single slice + Slice key_slices[3] = { Slice("key"), Slice("part2"), Slice("part3") }; + Slice value_slice("value"); + batch.Put(SliceParts(key_slices, 3), + SliceParts(&value_slice, 1)); + } + + WriteBatchInternal::SetSequence(&batch, 100); + ASSERT_EQ("Put(baz, headerpayload)@101" + "Put(foo, bar)@100" + "Put(keypart2part3, value)@102", + PrintContents(&batch)); + ASSERT_EQ(3, batch.Count()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 141480688..86199862c 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -98,6 +98,16 @@ class Slice { // Intentionally copyable }; +// A set of Slices that are virtually concatenated together. 'parts' points +// to an array of Slices. The number of elements in the array is 'num_parts'. +struct SliceParts { + SliceParts(const Slice* parts, int num_parts) : + parts(parts), num_parts(num_parts) { } + + const Slice* parts; + int num_parts; +}; + inline bool operator==(const Slice& x, const Slice& y) { return ((x.size() == y.size()) && (memcmp(x.data(), y.data(), x.size()) == 0)); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index e6f9ba3b2..36f135b83 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -27,6 +27,7 @@ namespace rocksdb { class Slice; +class SliceParts; class WriteBatch { public: @@ -36,6 +37,11 @@ class WriteBatch { // Store the mapping "key->value" in the database. void Put(const Slice& key, const Slice& value); + // Variant of Put() that gathers output like writev(2). The key and value + // that will be written to the database are concatentations of arrays of + // slices. + void Put(const SliceParts& key, const SliceParts& value); + // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" void Merge(const Slice& key, const Slice& value); diff --git a/util/coding.cc b/util/coding.cc index 8e596c9c0..2d70647fb 100644 --- a/util/coding.cc +++ b/util/coding.cc @@ -107,6 +107,18 @@ void PutLengthPrefixedSlice(std::string* dst, const Slice& value) { dst->append(value.data(), value.size()); } +void PutLengthPrefixedSliceParts(std::string* dst, + const SliceParts& slice_parts) { + uint32_t total_bytes = 0; + for (int i = 0; i < slice_parts.num_parts; ++i) { + total_bytes += slice_parts.parts[i].size(); + } + PutVarint32(dst, total_bytes); + for (int i = 0; i < slice_parts.num_parts; ++i) { + dst->append(slice_parts.parts[i].data(), slice_parts.parts[i].size()); + } +} + int VarintLength(uint64_t v) { int len = 1; while (v >= 128) { diff --git a/util/coding.h b/util/coding.h index 2c7d99891..3fd892f79 100644 --- a/util/coding.h +++ b/util/coding.h @@ -30,6 +30,8 @@ extern void PutFixed64(std::string* dst, uint64_t value); extern void PutVarint32(std::string* dst, uint32_t value); extern void PutVarint64(std::string* dst, uint64_t value); extern void PutLengthPrefixedSlice(std::string* dst, const Slice& value); +extern void PutLengthPrefixedSliceParts(std::string* dst, + const SliceParts& slice_parts); // Standard Get... routines parse a value from the beginning of a Slice // and advance the slice past the parsed value.