WriteBatch::Put() overload that gathers key and value from arrays of slices

Summary: In our project, when writing to the database, we want to form the value as the concatenation of a small header and a larger payload.  It's a shame to have to copy the payload just so we can give RocksDB API a linear view of the value.  Since RocksDB makes a copy internally, it's easy to support gather writes.

Test Plan: write_batch_test, new test case

Reviewers: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13947
main
lovro 11 years ago
parent 1510339e52
commit 8a46ecd357
  1. 7
      db/write_batch.cc
  2. 28
      db/write_batch_test.cc
  3. 10
      include/rocksdb/slice.h
  4. 6
      include/rocksdb/write_batch.h
  5. 12
      util/coding.cc
  6. 2
      util/coding.h

@ -146,6 +146,13 @@ void WriteBatch::Put(const Slice& key, const Slice& value) {
PutLengthPrefixedSlice(&rep_, value); PutLengthPrefixedSlice(&rep_, value);
} }
void WriteBatch::Put(const SliceParts& key, const SliceParts& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSliceParts(&rep_, key);
PutLengthPrefixedSliceParts(&rep_, value);
}
void WriteBatch::Delete(const Slice& key) { void WriteBatch::Delete(const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion)); rep_.push_back(static_cast<char>(kTypeDeletion));

@ -227,6 +227,34 @@ TEST(WriteBatchTest, Continue) {
handler.seen); handler.seen);
} }
TEST(WriteBatchTest, PutGatherSlices) {
WriteBatch batch;
batch.Put(Slice("foo"), Slice("bar"));
{
// Try a write where the key is one slice but the value is two
Slice key_slice("baz");
Slice value_slices[2] = { Slice("header"), Slice("payload") };
batch.Put(SliceParts(&key_slice, 1),
SliceParts(value_slices, 2));
}
{
// One where the key is composite but the value is a single slice
Slice key_slices[3] = { Slice("key"), Slice("part2"), Slice("part3") };
Slice value_slice("value");
batch.Put(SliceParts(key_slices, 3),
SliceParts(&value_slice, 1));
}
WriteBatchInternal::SetSequence(&batch, 100);
ASSERT_EQ("Put(baz, headerpayload)@101"
"Put(foo, bar)@100"
"Put(keypart2part3, value)@102",
PrintContents(&batch));
ASSERT_EQ(3, batch.Count());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -98,6 +98,16 @@ class Slice {
// Intentionally copyable // Intentionally copyable
}; };
// A set of Slices that are virtually concatenated together. 'parts' points
// to an array of Slices. The number of elements in the array is 'num_parts'.
struct SliceParts {
SliceParts(const Slice* parts, int num_parts) :
parts(parts), num_parts(num_parts) { }
const Slice* parts;
int num_parts;
};
inline bool operator==(const Slice& x, const Slice& y) { inline bool operator==(const Slice& x, const Slice& y) {
return ((x.size() == y.size()) && return ((x.size() == y.size()) &&
(memcmp(x.data(), y.data(), x.size()) == 0)); (memcmp(x.data(), y.data(), x.size()) == 0));

@ -27,6 +27,7 @@
namespace rocksdb { namespace rocksdb {
class Slice; class Slice;
class SliceParts;
class WriteBatch { class WriteBatch {
public: public:
@ -36,6 +37,11 @@ class WriteBatch {
// Store the mapping "key->value" in the database. // Store the mapping "key->value" in the database.
void Put(const Slice& key, const Slice& value); void Put(const Slice& key, const Slice& value);
// Variant of Put() that gathers output like writev(2). The key and value
// that will be written to the database are concatentations of arrays of
// slices.
void Put(const SliceParts& key, const SliceParts& value);
// Merge "value" with the existing value of "key" in the database. // Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)" // "key->merge(existing, value)"
void Merge(const Slice& key, const Slice& value); void Merge(const Slice& key, const Slice& value);

@ -107,6 +107,18 @@ void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
dst->append(value.data(), value.size()); dst->append(value.data(), value.size());
} }
void PutLengthPrefixedSliceParts(std::string* dst,
const SliceParts& slice_parts) {
uint32_t total_bytes = 0;
for (int i = 0; i < slice_parts.num_parts; ++i) {
total_bytes += slice_parts.parts[i].size();
}
PutVarint32(dst, total_bytes);
for (int i = 0; i < slice_parts.num_parts; ++i) {
dst->append(slice_parts.parts[i].data(), slice_parts.parts[i].size());
}
}
int VarintLength(uint64_t v) { int VarintLength(uint64_t v) {
int len = 1; int len = 1;
while (v >= 128) { while (v >= 128) {

@ -30,6 +30,8 @@ extern void PutFixed64(std::string* dst, uint64_t value);
extern void PutVarint32(std::string* dst, uint32_t value); extern void PutVarint32(std::string* dst, uint32_t value);
extern void PutVarint64(std::string* dst, uint64_t value); extern void PutVarint64(std::string* dst, uint64_t value);
extern void PutLengthPrefixedSlice(std::string* dst, const Slice& value); extern void PutLengthPrefixedSlice(std::string* dst, const Slice& value);
extern void PutLengthPrefixedSliceParts(std::string* dst,
const SliceParts& slice_parts);
// Standard Get... routines parse a value from the beginning of a Slice // Standard Get... routines parse a value from the beginning of a Slice
// and advance the slice past the parsed value. // and advance the slice past the parsed value.

Loading…
Cancel
Save