Implement log blobs

Summary:
This patch adds the ability for the user to add sequences of arbitrary data (blobs) to write batches. These blobs are saved to the log along with everything else in the write batch. You can add multiple blobs per WriteBatch and the ordering of blobs, puts, merges, and deletes are preserved.

Blobs are not saves to SST files. RocksDB ignores blobs in every way except for writing them to the log.

Before committing this patch, I need to add some test code. But I'm submitting it now so people can comment on the API.

Test Plan: make -j32 check

Reviewers: dhruba, haobo, vamsi

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12195
main
Jim Paton 11 years ago
parent d9dd2a1926
commit 0307c5fe3a
  1. 20
      Makefile
  2. 3
      db/db_iter.cc
  3. 45
      db/db_test.cc
  4. 3
      db/dbformat.h
  5. 3
      db/memtable.cc
  6. 4
      db/version_set.cc
  7. 22
      db/write_batch.cc
  8. 51
      db/write_batch_test.cc
  9. 21
      include/leveldb/write_batch.h

@ -37,34 +37,34 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full
TESTS = \ TESTS = \
arena_test \ arena_test \
auto_roll_logger_test \
block_test \
bloom_test \ bloom_test \
c_test \ c_test \
cache_test \ cache_test \
coding_test \ coding_test \
histogram_test \
corruption_test \ corruption_test \
crc32c_test \ crc32c_test \
db_test \ db_test \
dbformat_test \ dbformat_test \
env_test \ env_test \
filelock_test \
filename_test \ filename_test \
filter_block_test \ filter_block_test \
histogram_test \
log_test \ log_test \
manual_compaction_test \
memenv_test \ memenv_test \
merge_test \
redis_test \
reduce_levels_test \
skiplist_test \ skiplist_test \
stringappend_test \
table_test \ table_test \
ttl_test \ ttl_test \
block_test \
version_edit_test \ version_edit_test \
version_set_test \ version_set_test \
reduce_levels_test \ write_batch_test
write_batch_test \
auto_roll_logger_test \
filelock_test \
merge_test \
redis_test \
manual_compaction_test \
stringappend_test
TOOLS = \ TOOLS = \
sst_dump \ sst_dump \

@ -217,6 +217,9 @@ void DBIter::FindNextUserEntry(bool skipping) {
// TODO: what if !iter_->Valid() // TODO: what if !iter_->Valid()
return; return;
break; break;
case kTypeLogData:
assert(false);
break;
} }
} }
} }

@ -457,6 +457,9 @@ class DBTest {
case kTypeDeletion: case kTypeDeletion:
result += "DEL"; result += "DEL";
break; break;
case kTypeLogData:
assert(false);
break;
} }
} }
iter->Next(); iter->Next();
@ -3132,6 +3135,48 @@ TEST(DBTest, TransactionLogIteratorBatchOperations) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST(DBTest, TransactionLogIteratorBlobs) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
{
WriteBatch batch;
batch.Put("key1", DummyString(1024));
batch.Put("key2", DummyString(1024));
batch.PutLogData(Slice("blob1"));
batch.Put("key3", DummyString(1024));
batch.PutLogData(Slice("blob2"));
batch.Delete("key2");
dbfull()->Write(WriteOptions(), &batch);
Reopen(&options);
}
auto res = OpenTransactionLogIter(0)->GetBatch();
struct Handler : public WriteBatch::Handler {
std::string seen;
virtual void Put(const Slice& key, const Slice& value) {
seen += "Put(" + key.ToString() + ", " + std::to_string(value.size()) +
")";
}
virtual void Merge(const Slice& key, const Slice& value) {
seen += "Merge(" + key.ToString() + ", " + std::to_string(value.size()) +
")";
}
virtual void LogData(const Slice& blob) {
seen += "LogData(" + blob.ToString() + ")";
}
virtual void Delete(const Slice& key) {
seen += "Delete(" + key.ToString() + ")";
}
} handler;
res.writeBatchPtr->Iterate(&handler);
ASSERT_EQ("Put(key1, 1024)"
"Put(key2, 1024)"
"LogData(blob1)"
"Put(key3, 1024)"
"LogData(blob2)"
"Delete(key2)", handler.seen);
}
TEST(DBTest, ReadCompaction) { TEST(DBTest, ReadCompaction) {
std::string value(4096, '4'); // a string of size 4K std::string value(4096, '4'); // a string of size 4K
{ {

@ -25,7 +25,8 @@ class InternalKey;
enum ValueType { enum ValueType {
kTypeDeletion = 0x0, kTypeDeletion = 0x0,
kTypeValue = 0x1, kTypeValue = 0x1,
kTypeMerge = 0x2 kTypeMerge = 0x2,
kTypeLogData = 0x3
}; };
// kValueTypeForSeek defines the ValueType that should be passed when // kValueTypeForSeek defines the ValueType that should be passed when
// constructing a ParsedInternalKey object for seeking to a particular // constructing a ParsedInternalKey object for seeking to a particular

@ -211,6 +211,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
} }
break; break;
} }
case kTypeLogData:
assert(false);
break;
} }
} else { } else {
// exit loop if user key does not match // exit loop if user key does not match

@ -330,6 +330,10 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
} }
} }
return true; return true;
case kTypeLogData:
assert(false);
break;
} }
} }
} }

@ -43,6 +43,11 @@ void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) {
throw std::runtime_error("Handler::Merge not implemented!"); throw std::runtime_error("Handler::Merge not implemented!");
} }
void WriteBatch::Handler::LogData(const Slice& blob) {
// If the user has not specified something to do with blobs, then we ignore
// them.
}
void WriteBatch::Clear() { void WriteBatch::Clear() {
rep_.clear(); rep_.clear();
rep_.resize(kHeader); rep_.resize(kHeader);
@ -59,10 +64,9 @@ Status WriteBatch::Iterate(Handler* handler) const {
} }
input.remove_prefix(kHeader); input.remove_prefix(kHeader);
Slice key, value; Slice key, value, blob;
int found = 0; int found = 0;
while (!input.empty()) { while (!input.empty()) {
found++;
char tag = input[0]; char tag = input[0];
input.remove_prefix(1); input.remove_prefix(1);
switch (tag) { switch (tag) {
@ -70,6 +74,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (GetLengthPrefixedSlice(&input, &key) && if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) { GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value); handler->Put(key, value);
found++;
} else { } else {
return Status::Corruption("bad WriteBatch Put"); return Status::Corruption("bad WriteBatch Put");
} }
@ -77,6 +82,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
case kTypeDeletion: case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) { if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key); handler->Delete(key);
found++;
} else { } else {
return Status::Corruption("bad WriteBatch Delete"); return Status::Corruption("bad WriteBatch Delete");
} }
@ -85,10 +91,18 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (GetLengthPrefixedSlice(&input, &key) && if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) { GetLengthPrefixedSlice(&input, &value)) {
handler->Merge(key, value); handler->Merge(key, value);
found++;
} else { } else {
return Status::Corruption("bad WriteBatch Merge"); return Status::Corruption("bad WriteBatch Merge");
} }
break; break;
case kTypeLogData:
if (GetLengthPrefixedSlice(&input, &blob)) {
handler->LogData(blob);
} else {
return Status::Corruption("bad WriteBatch Blob");
}
break;
default: default:
return Status::Corruption("unknown WriteBatch tag"); return Status::Corruption("unknown WriteBatch tag");
} }
@ -136,6 +150,10 @@ void WriteBatch::Merge(const Slice& key, const Slice& value) {
PutLengthPrefixedSlice(&rep_, value); PutLengthPrefixedSlice(&rep_, value);
} }
void WriteBatch::PutLogData(const Slice& blob) {
rep_.push_back(static_cast<char>(kTypeLogData));
PutLengthPrefixedSlice(&rep_, blob);
}
namespace { namespace {
class MemTableInserter : public WriteBatch::Handler { class MemTableInserter : public WriteBatch::Handler {

@ -50,13 +50,16 @@ static std::string PrintContents(WriteBatch* b) {
state.append(")"); state.append(")");
count++; count++;
break; break;
case kTypeLogData:
assert(false);
break;
} }
state.append("@"); state.append("@");
state.append(NumberToString(ikey.sequence)); state.append(NumberToString(ikey.sequence));
} }
delete iter; delete iter;
if (!s.ok()) { if (!s.ok()) {
state.append("ParseError()"); state.append(s.ToString());
} else if (count != WriteBatchInternal::Count(b)) { } else if (count != WriteBatchInternal::Count(b)) {
state.append("CountMismatch()"); state.append("CountMismatch()");
} }
@ -97,7 +100,7 @@ TEST(WriteBatchTest, Corruption) {
WriteBatchInternal::SetContents(&batch, WriteBatchInternal::SetContents(&batch,
Slice(contents.data(),contents.size()-1)); Slice(contents.data(),contents.size()-1));
ASSERT_EQ("Put(foo, bar)@200" ASSERT_EQ("Put(foo, bar)@200"
"ParseError()", "Corruption: bad WriteBatch Delete",
PrintContents(&batch)); PrintContents(&batch));
} }
@ -131,6 +134,50 @@ TEST(WriteBatchTest, Append) {
ASSERT_EQ(4, b1.Count()); ASSERT_EQ(4, b1.Count());
} }
TEST(WriteBatchTest, Blob) {
WriteBatch batch;
batch.Put(Slice("k1"), Slice("v1"));
batch.Put(Slice("k2"), Slice("v2"));
batch.Put(Slice("k3"), Slice("v3"));
batch.PutLogData(Slice("blob1"));
batch.Delete(Slice("k2"));
batch.PutLogData(Slice("blob2"));
batch.Merge(Slice("foo"), Slice("bar"));
ASSERT_EQ(5, batch.Count());
ASSERT_EQ("Merge(foo, bar)@4"
"Put(k1, v1)@0"
"Delete(k2)@3"
"Put(k2, v2)@1"
"Put(k3, v3)@2",
PrintContents(&batch));
struct Handler : public WriteBatch::Handler {
std::string seen;
virtual void Put(const Slice& key, const Slice& value) {
seen += "Put(" + key.ToString() + ", " + value.ToString() + ")";
}
virtual void Merge(const Slice& key, const Slice& value) {
seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")";
}
virtual void LogData(const Slice& blob) {
seen += "LogData(" + blob.ToString() + ")";
}
virtual void Delete(const Slice& key) {
seen += "Delete(" + key.ToString() + ")";
}
} handler;
batch.Iterate(&handler);
ASSERT_EQ(
"Put(k1, v1)"
"Put(k2, v2)"
"Put(k3, v3)"
"LogData(blob1)"
"Delete(k2)"
"LogData(blob2)"
"Merge(foo, bar)",
handler.seen);
}
} // namespace leveldb } // namespace leveldb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -43,6 +43,17 @@ class WriteBatch {
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(const Slice& key); void Delete(const Slice& key);
// Append a blob of arbitrary to the records in this batch. The blob will be
// stored in the transaction log but not in any other file. In particular, it
// will not be persisted to the SST files. When iterating over this
// WriteBatch, WriteBatch::Handler::LogData will be called with the contents
// of the blob as it is encountered. Blobs, puts, deletes, and merges will be
// encountered in the same order in thich they were inserted.
//
// Example application: add timestamps to the transaction log for use in
// replication.
void PutLogData(const Slice& blob);
// Clear all updates buffered in this batch. // Clear all updates buffered in this batch.
void Clear(); void Clear();
@ -51,10 +62,12 @@ class WriteBatch {
public: public:
virtual ~Handler(); virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0; virtual void Put(const Slice& key, const Slice& value) = 0;
// Merge is not pure virtual. Otherwise, we would break existing // Merge and LogData are not pure virtual. Otherwise, we would break
// clients of Handler on a source code level. // existing clients of Handler on a source code level. The default
// The default implementation simply throws a runtime exception. // implementation of Merge simply throws a runtime exception.
virtual void Merge(const Slice& key, const Slice& value); virtual void Merge(const Slice& key, const Slice& value);
// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);
virtual void Delete(const Slice& key) = 0; virtual void Delete(const Slice& key) = 0;
}; };
Status Iterate(Handler* handler) const; Status Iterate(Handler* handler) const;
@ -66,7 +79,7 @@ class WriteBatch {
int Count() const; int Count() const;
// Constructor with a serialized string object // Constructor with a serialized string object
WriteBatch(std::string rep): rep_(rep) {} explicit WriteBatch(std::string rep): rep_(rep) {}
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;

Loading…
Cancel
Save