From afed60938f068e48bb02c27b128ea1fca62ceb54 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 5 Mar 2013 21:53:28 -0800 Subject: [PATCH] Fox db_stress crash by copying keys before changing sequencenum to zero. Summary: The compaction process zeros out sequence numbers if the output is part of the bottommost level. The Slice is supposed to refer to an immutable data buffer. The merger that implements the priority queue while reading kvs as the input of a compaction run reies on this fact. The bug was that were updating the sequence number of a record in-place and that was causing suceeding invocations of the merger to return kvs in arbitrary order of sequence numbers. The fix is to copy the key to a local memory buffer before setting its seqno to 0. Test Plan: Set Options.purge_redundant_kvs_while_flush = false and then run db_stress --ops_per_thread=1000 --max_key=320 Reviewers: emayanke, sheki Reviewed By: emayanke CC: leveldb Differential Revision: https://reviews.facebook.net/D9147 --- db/db_impl.cc | 18 ++++++++++++++---- db/dbformat.h | 8 ++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 540f30f54..e9b890c4b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1656,15 +1656,25 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (!drop) { + char* kptr = (char*)key.data(); + std::string kstr; + // Zeroing out the sequence number leads to better compression. // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. if (bottommost_level && ikey.sequence < earliest_snapshot) { assert(ikey.type != kTypeDeletion); - UpdateInternalKey(key, (uint64_t)0, ikey.type); + // make a copy because updating in place would cause problems + // with the priority queue that is managing the input key iterator + kstr.assign(key.data(), key.size()); + kptr = (char *)kstr.c_str(); + UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type); } + Slice newkey(kptr, key.size()); + assert((key.clear(), 1)); // we do not need 'key' anymore + // Open output file if necessary if (compact->builder == nullptr) { status = OpenCompactionOutputFile(compact); @@ -1673,10 +1683,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } } if (compact->builder->NumEntries() == 0) { - compact->current_output()->smallest.DecodeFrom(key); + compact->current_output()->smallest.DecodeFrom(newkey); } - compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, value); + compact->current_output()->largest.DecodeFrom(newkey); + compact->builder->Add(newkey, value); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/dbformat.h b/db/dbformat.h index ceedc0438..fb8525b0b 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -158,11 +158,11 @@ inline bool ParseInternalKey(const Slice& internal_key, } // Update the sequence number in the internal key -inline void UpdateInternalKey(const Slice& internal_key, +inline void UpdateInternalKey(char* internal_key, + const size_t internal_key_size, uint64_t seq, ValueType t) { - const size_t n = internal_key.size(); - assert(n >= 8); - char* seqtype = (char *)internal_key.data() + n - 8; + assert(internal_key_size >= 8); + char* seqtype = internal_key + internal_key_size - 8; uint64_t newval = (seq << 8) | t; EncodeFixed64(seqtype, newval); }