tuple uniqueness constraint check?
Closed this issue · 0 comments
github-actions commented
If this is a tuple with a unique constraint, we need to check if there's a
newer tuple with the same domain value.
/ commit set in the process.
before releasing the lock.
moor/crates/db/src/rdb/tx/transaction.rs
Line 321 in 12fc849
/// A set of tuples to be committed to the canonical base relations, based on a transaction's
/// working set.
pub struct CommitSet<'a> {
ts: u64,
relations: Box<BitArray<BaseRelation, 64, Bitset64<1>>>,
// Holds a lock on the base relations, which we'll swap out with the new relations at successful commit
write_guard: RwLockWriteGuard<'a, Vec<BaseRelation>>,
// I can't/shouldn't be moved around between threads...
unsend: PhantomUnsend,
unsync: PhantomUnsync,
}
impl<'a> CommitSet<'a> {
pub(crate) fn new(ts: u64, write_guard: RwLockWriteGuard<'a, Vec<BaseRelation>>) -> Self {
Self {
ts,
relations: Box::new(BitArray::new()),
write_guard,
unsend: Default::default(),
unsync: Default::default(),
}
}
pub(crate) fn prepare(&mut self, tx_working_set: &mut WorkingSet) -> Result<(), CommitError> {
for (_, local_relation) in tx_working_set.relations.iter_mut() {
let relation_id = local_relation.id;
// scan through the local working set, and for each tuple, check to see if it's safe to
// commit. If it is, then we'll add it to the commit set.
// note we're not actually committing yet, just producing a candidate commit set
for tuple in local_relation.tuples_iter_mut() {
match &mut tuple.op {
TxTupleOp::Insert(tuple) => {
// Generally inserts should present no conflicts except for constraint violations.
// If this is an insert, we want to verify that unique constraints are not violated, so we
// need to check the canonical relation for the domain value.
// Apply timestamp logic, tho.
let canonical = &self.write_guard[relation_id.0];
let results_canonical = canonical.seek_by_domain(tuple.domain());
let mut replacements = im::HashSet::new();
for t in results_canonical {
if canonical.unique_domain && t.ts() > tuple.ts() {
return Err(CommitError::UniqueConstraintViolation);
}
// Check the timestamp on the upstream value, if it's newer than the read-timestamp,
// we have for this tuple then that's a conflict, because it means someone else has
// already committed a change to this tuple.
// Otherwise, we clobber their value.
if t.ts() > tuple.ts() {
return Err(CommitError::TupleVersionConflict);
}
replacements.insert(t);
}
// Otherwise we can straight-away insert into the our fork of the relation.
tuple.update_timestamp(self.ts);
let forked_relation = self.fork(relation_id);
for t in replacements {
forked_relation.remove_tuple(&t.id()).unwrap();
}
forked_relation.insert_tuple(tuple.clone()).unwrap();
}
TxTupleOp::Update {
from_tuple: old_tuple,
to_tuple: new_tuple,
} => {
let canonical = &self.write_guard[relation_id.0];
// If this is an update, we want to verify that the tuple we're updating is still there
// If it's not, that's a conflict.
if !canonical.has_tuple(&old_tuple.id()) {
// Someone got here first and deleted the tuple we're trying to update.
// By definition, this is a conflict.
return Err(CommitError::TupleVersionConflict);
};
// TODO tuple uniqueness constraint check?
// Otherwise apply the change into the forked relation
new_tuple.update_timestamp(self.ts);
let forked_relation = self.fork(relation_id);
forked_relation
.update_tuple(&old_tuple.id(), new_tuple.clone())
.unwrap();
}
TxTupleOp::Tombstone(tuple, _) => {
let canonical = &self.write_guard[relation_id.0];
// Check that the tuple we're trying to delete is still there.
if !canonical.has_tuple(&tuple.id()) {
// Someone got here first and deleted the tuple we're trying to delete.
// If this is a tuple with a unique constraint, we need to check if there's a
// newer tuple with the same domain value.
if canonical.unique_domain {
let results_canonical = canonical.seek_by_domain(tuple.domain());
for t in results_canonical {
if t.ts() > tuple.ts() {
return Err(CommitError::UniqueConstraintViolation);
}
}
}
// No confict, and was already deleted. So we don't have to do anything.
} else {
// If so, do the del0rt in our fork.
let forked_relation = self.fork(relation_id);
forked_relation.remove_tuple(&tuple.id()).unwrap();
}
}
TxTupleOp::Value(_) => {
continue;
}
}
}
}
Ok(())
}
pub(crate) fn try_commit(mut self) -> Result<(), CommitError> {
// Everything passed, so we can commit the changes by swapping in the new canonical
// before releasing the lock.
let commit_ts = self.ts;
for (_, mut relation) in self.relations.take_all() {
let idx = relation.id.0;
relation.ts = commit_ts;
self.write_guard[idx] = relation;
}
Ok(())
}
/// Fork the given base relation into the commit set, if it's not already there.
fn fork(&mut self, relation_id: RelationId) -> &mut BaseRelation {
if self.relations.get(relation_id.0).is_none() {
let r = self.write_guard[relation_id.0].clone();
self.relations.set(relation_id.0, r);
}
self.relations.get_mut(relation_id.0).unwrap()