|
|
|
@ -1323,26 +1323,19 @@ impl FileBulkLoader { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn encode(&mut self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> { |
|
|
|
|
let mut buffer = Vec::new(); |
|
|
|
|
for quad in quads { |
|
|
|
|
let encoded = EncodedQuad::from(quad.as_ref()); |
|
|
|
|
buffer.clear(); |
|
|
|
|
if quad.graph_name.is_default_graph() { |
|
|
|
|
write_spo_quad(&mut buffer, &encoded); |
|
|
|
|
if self.triples.insert(encoded.clone()) { |
|
|
|
|
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; |
|
|
|
|
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; |
|
|
|
|
self.insert_term(quad.object.as_ref(), &encoded.object)?; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
write_spog_quad(&mut buffer, &encoded); |
|
|
|
|
if self.quads.insert(encoded.clone()) { |
|
|
|
|
} else if self.quads.insert(encoded.clone()) { |
|
|
|
|
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; |
|
|
|
|
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; |
|
|
|
|
self.insert_term(quad.object.as_ref(), &encoded.object)?; |
|
|
|
|
|
|
|
|
|
buffer.clear(); |
|
|
|
|
write_term(&mut buffer, &encoded.graph_name); |
|
|
|
|
if self.graphs.insert(encoded.graph_name.clone()) { |
|
|
|
|
self.insert_term( |
|
|
|
|
match quad.graph_name.as_ref() { |
|
|
|
@ -1355,7 +1348,6 @@ impl FileBulkLoader { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1405,11 +1397,11 @@ impl FileBulkLoader { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !self.quads.is_empty() { |
|
|
|
|
let quads = take(&mut self.graphs); |
|
|
|
|
to_load.push(( |
|
|
|
|
&self.storage.graphs_cf, |
|
|
|
|
self.build_sst_for_keys(quads.into_iter().map(|g| encode_term(&g)))?, |
|
|
|
|
self.build_sst_for_keys(self.graphs.iter().map(encode_term))?, |
|
|
|
|
)); |
|
|
|
|
self.graphs.clear(); |
|
|
|
|
|
|
|
|
|
to_load.push(( |
|
|
|
|
&self.storage.gspo_cf, |
|
|
|
@ -1500,19 +1492,9 @@ impl FileBulkLoader { |
|
|
|
|
) -> Result<PathBuf, StorageError> { |
|
|
|
|
let mut values = values.collect::<Vec<_>>(); |
|
|
|
|
values.sort_unstable(); |
|
|
|
|
let deduplicated_values = values.iter().enumerate().filter_map(|(i, value)| { |
|
|
|
|
if values |
|
|
|
|
.get(i + 1) |
|
|
|
|
.map_or(true, |next_value| value != next_value) |
|
|
|
|
{ |
|
|
|
|
Some(value) |
|
|
|
|
} else { |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
let mut sst = self.storage.db.new_sst_file()?; |
|
|
|
|
for value in deduplicated_values { |
|
|
|
|
sst.insert_empty(value)?; |
|
|
|
|
for value in values { |
|
|
|
|
sst.insert_empty(&value)?; |
|
|
|
|
} |
|
|
|
|
sst.finish() |
|
|
|
|
} |
|
|
|
|