fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							223 lines
						
					
					
						
							8.1 KiB
						
					
					
				
			
		
		
	
	
							223 lines
						
					
					
						
							8.1 KiB
						
					
					
				| //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under the BSD-style license found in the
 | |
| //  LICENSE file in the root directory of this source tree. An additional grant
 | |
| //  of patent rights can be found in the PATENTS file in the same directory.
 | |
| //
 | |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 | |
| // Use of this source code is governed by a BSD-style license that can be
 | |
| // found in the LICENSE file. See the AUTHORS file for names of contributors.
 | |
| 
 | |
| #include "db/builder.h"
 | |
| 
 | |
| #include "db/dbformat.h"
 | |
| #include "db/filename.h"
 | |
| #include "db/merge_helper.h"
 | |
| #include "db/table_cache.h"
 | |
| #include "db/version_edit.h"
 | |
| #include "rocksdb/db.h"
 | |
| #include "rocksdb/env.h"
 | |
| #include "rocksdb/iterator.h"
 | |
| #include "rocksdb/options.h"
 | |
| #include "rocksdb/table.h"
 | |
| #include "table/block_based_table_builder.h"
 | |
| #include "util/stop_watch.h"
 | |
| 
 | |
| namespace rocksdb {
 | |
| 
 | |
| class TableFactory;
 | |
| 
 | |
| TableBuilder* NewTableBuilder(const Options& options,
 | |
|                               const InternalKeyComparator& internal_comparator,
 | |
|                               WritableFile* file,
 | |
|                               CompressionType compression_type) {
 | |
|   return options.table_factory->NewTableBuilder(options, internal_comparator,
 | |
|                                                 file, compression_type);
 | |
| }
 | |
| 
 | |
| Status BuildTable(const std::string& dbname, Env* env, const Options& options,
 | |
|                   const EnvOptions& soptions, TableCache* table_cache,
 | |
|                   Iterator* iter, FileMetaData* meta,
 | |
|                   const InternalKeyComparator& internal_comparator,
 | |
|                   const SequenceNumber newest_snapshot,
 | |
|                   const SequenceNumber earliest_seqno_in_memtable,
 | |
|                   const CompressionType compression) {
 | |
|   Status s;
 | |
|   meta->file_size = 0;
 | |
|   meta->smallest_seqno = meta->largest_seqno = 0;
 | |
|   iter->SeekToFirst();
 | |
| 
 | |
|   // If the sequence number of the smallest entry in the memtable is
 | |
|   // smaller than the most recent snapshot, then we do not trigger
 | |
|   // removal of duplicate/deleted keys as part of this builder.
 | |
|   bool purge = options.purge_redundant_kvs_while_flush;
 | |
|   if (earliest_seqno_in_memtable <= newest_snapshot) {
 | |
|     purge = false;
 | |
|   }
 | |
| 
 | |
|   std::string fname = TableFileName(dbname, meta->number);
 | |
|   if (iter->Valid()) {
 | |
|     unique_ptr<WritableFile> file;
 | |
|     s = env->NewWritableFile(fname, &file, soptions);
 | |
|     if (!s.ok()) {
 | |
|       return s;
 | |
|     }
 | |
| 
 | |
|     TableBuilder* builder =
 | |
|         NewTableBuilder(options, internal_comparator, file.get(), compression);
 | |
| 
 | |
|     // the first key is the smallest key
 | |
|     Slice key = iter->key();
 | |
|     meta->smallest.DecodeFrom(key);
 | |
|     meta->smallest_seqno = GetInternalKeySeqno(key);
 | |
|     meta->largest_seqno = meta->smallest_seqno;
 | |
| 
 | |
|     MergeHelper merge(internal_comparator.user_comparator(),
 | |
|                       options.merge_operator.get(), options.info_log.get(),
 | |
|                       true /* internal key corruption is not ok */);
 | |
| 
 | |
|     if (purge) {
 | |
|       // Ugly walkaround to avoid compiler error for release build
 | |
|       bool ok __attribute__((unused)) = true;
 | |
| 
 | |
|       // Will write to builder if current key != prev key
 | |
|       ParsedInternalKey prev_ikey;
 | |
|       std::string prev_key;
 | |
|       bool is_first_key = true;    // Also write if this is the very first key
 | |
| 
 | |
|       while (iter->Valid()) {
 | |
|         bool iterator_at_next = false;
 | |
| 
 | |
|         // Get current key
 | |
|         ParsedInternalKey this_ikey;
 | |
|         Slice key = iter->key();
 | |
|         Slice value = iter->value();
 | |
| 
 | |
|         // In-memory key corruption is not ok;
 | |
|         // TODO: find a clean way to treat in memory key corruption
 | |
|         ok = ParseInternalKey(key, &this_ikey);
 | |
|         assert(ok);
 | |
|         assert(this_ikey.sequence >= earliest_seqno_in_memtable);
 | |
| 
 | |
|         // If the key is the same as the previous key (and it is not the
 | |
|         // first key), then we skip it, since it is an older version.
 | |
|         // Otherwise we output the key and mark it as the "new" previous key.
 | |
|         if (!is_first_key && !internal_comparator.user_comparator()->Compare(
 | |
|                                   prev_ikey.user_key, this_ikey.user_key)) {
 | |
|           // seqno within the same key are in decreasing order
 | |
|           assert(this_ikey.sequence < prev_ikey.sequence);
 | |
|         } else {
 | |
|           is_first_key = false;
 | |
| 
 | |
|           if (this_ikey.type == kTypeMerge) {
 | |
|             // Handle merge-type keys using the MergeHelper
 | |
|             // TODO: pass statistics to MergeUntil
 | |
|             merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
 | |
|             iterator_at_next = true;
 | |
|             if (merge.IsSuccess()) {
 | |
|               // Merge completed correctly.
 | |
|               // Add the resulting merge key/value and continue to next
 | |
|               builder->Add(merge.key(), merge.value());
 | |
|               prev_key.assign(merge.key().data(), merge.key().size());
 | |
|               ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
 | |
|               assert(ok);
 | |
|             } else {
 | |
|               // Merge did not find a Put/Delete.
 | |
|               // Can not compact these merges into a kValueType.
 | |
|               // Write them out one-by-one. (Proceed back() to front())
 | |
|               const std::deque<std::string>& keys = merge.keys();
 | |
|               const std::deque<std::string>& values = merge.values();
 | |
|               assert(keys.size() == values.size() && keys.size() >= 1);
 | |
|               std::deque<std::string>::const_reverse_iterator key_iter;
 | |
|               std::deque<std::string>::const_reverse_iterator value_iter;
 | |
|               for (key_iter=keys.rbegin(), value_iter = values.rbegin();
 | |
|                    key_iter != keys.rend() && value_iter != values.rend();
 | |
|                    ++key_iter, ++value_iter) {
 | |
| 
 | |
|                 builder->Add(Slice(*key_iter), Slice(*value_iter));
 | |
|               }
 | |
| 
 | |
|               // Sanity check. Both iterators should end at the same time
 | |
|               assert(key_iter == keys.rend() && value_iter == values.rend());
 | |
| 
 | |
|               prev_key.assign(keys.front());
 | |
|               ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
 | |
|               assert(ok);
 | |
|             }
 | |
|           } else {
 | |
|             // Handle Put/Delete-type keys by simply writing them
 | |
|             builder->Add(key, value);
 | |
|             prev_key.assign(key.data(), key.size());
 | |
|             ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
 | |
|             assert(ok);
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         if (!iterator_at_next) iter->Next();
 | |
|       }
 | |
| 
 | |
|       // The last key is the largest key
 | |
|       meta->largest.DecodeFrom(Slice(prev_key));
 | |
|       SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key));
 | |
|       meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
 | |
|       meta->largest_seqno = std::max(meta->largest_seqno, seqno);
 | |
| 
 | |
|     } else {
 | |
|       for (; iter->Valid(); iter->Next()) {
 | |
|         Slice key = iter->key();
 | |
|         meta->largest.DecodeFrom(key);
 | |
|         builder->Add(key, iter->value());
 | |
|         SequenceNumber seqno = GetInternalKeySeqno(key);
 | |
|         meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
 | |
|         meta->largest_seqno = std::max(meta->largest_seqno, seqno);
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     // Finish and check for builder errors
 | |
|     if (s.ok()) {
 | |
|       s = builder->Finish();
 | |
|       if (s.ok()) {
 | |
|         meta->file_size = builder->FileSize();
 | |
|         assert(meta->file_size > 0);
 | |
|       }
 | |
|     } else {
 | |
|       builder->Abandon();
 | |
|     }
 | |
|     delete builder;
 | |
| 
 | |
|     // Finish and check for file errors
 | |
|     if (s.ok() && !options.disableDataSync) {
 | |
|       if (options.use_fsync) {
 | |
|         StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
 | |
|         s = file->Fsync();
 | |
|       } else {
 | |
|         StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
 | |
|         s = file->Sync();
 | |
|       }
 | |
|     }
 | |
|     if (s.ok()) {
 | |
|       s = file->Close();
 | |
|     }
 | |
| 
 | |
|     if (s.ok()) {
 | |
|       // Verify that the table is usable
 | |
|       Iterator* it = table_cache->NewIterator(ReadOptions(), soptions,
 | |
|                                               internal_comparator, *meta);
 | |
|       s = it->status();
 | |
|       delete it;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Check for input iterator errors
 | |
|   if (!iter->status().ok()) {
 | |
|     s = iter->status();
 | |
|   }
 | |
| 
 | |
|   if (s.ok() && meta->file_size > 0) {
 | |
|     // Keep it
 | |
|   } else {
 | |
|     env->DeleteFile(fname);
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| }  // namespace rocksdb
 | |
| 
 |