Skip to content

Commit 9257340

Browse files
authored
Fix a corner case issue that causes inconsistent Coordinator states when lazy recovery happens before group commit (#2135)
1 parent 26c4314 commit 9257340

File tree

10 files changed

+1904
-174
lines changed

10 files changed

+1904
-174
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java

+100
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,106 @@ void putStateForGroupCommit(
146146
put(put);
147147
}
148148

149+
public void putStateForLazyRecoveryRollback(String id) throws CoordinatorException {
150+
if (keyManipulator.isFullKey(id)) {
151+
putStateForLazyRecoveryRollbackForGroupCommit(id);
152+
return;
153+
}
154+
155+
putState(new Coordinator.State(id, TransactionState.ABORTED));
156+
}
157+
158+
private void putStateForLazyRecoveryRollbackForGroupCommit(String id)
159+
throws CoordinatorException {
160+
// Lazy recoveries don't know which the transaction that created the PREPARE record is using, a
161+
// parent ID or a full ID as `tx_id` partition key.
162+
//
163+
// Case a) If a transaction becomes "ready for commit" in time, it'll be committed in a group
164+
// with `tx_id: <parent tx ID>`.
165+
// Case b) If a transaction is delayed, it'll be committed in an isolated group with a full ID
166+
// as `tx_id: <full tx ID>`.
167+
//
168+
// If lazy recoveries only insert a record with `tx_id: <full tx ID>` to abort the transaction,
169+
// it will not conflict with the group commit using `tx_id: <parent tx ID>` in case #a.
170+
// Therefore, lazy recoveries first need to insert a record with `tx_id: <parent tx ID>` and
171+
// empty `tx_child_ids` to the Coordinator table. We'll call this insertion
172+
// `lazy-recovery-abort-with-parent-id`. This record is intended to conflict with a potential
173+
// group commit considering case#1, even though it doesn't help in finding the coordinator state
174+
// since `tx_child_ids` is empty.
175+
//
176+
// Once the record insertion with `tx_id: <parent tx ID>` succeeds, the lazy recovery will
177+
// insert another record with `tx_id: <full tx ID>`. We'll call this insertion
178+
// `lazy-recovery-abort-with-full-id`. This record insertion is needed to conflict with a
179+
// potential delayed group commit that has `tx_id: <full tx ID>` in case #b, and indicates the
180+
// transaction is aborted.
181+
//
182+
// Let's walk through all the cases.
183+
//
184+
// A. The original commit with `tx_id: <parent tx ID>` succeeds in case #a, and then lazy
185+
// recovery happens
186+
// - The original commit with `tx_id: <parent tx ID>` succeeds
187+
// - `lazy-recovery-abort-with-parent-id` fails
188+
// - The transaction is treated as committed since the commit's `tx_child_ids` contains the
189+
// transaction child ID
190+
//
191+
// B. The original commit with `tx_id: <parent tx ID>` is in-progress in case #a, and lazy
192+
// recovery happens first
193+
// - `lazy-recovery-abort-with-parent-id` succeeds
194+
// - The original commit with `tx_id: <parent tx ID>` fails
195+
// - (If the lazy recovery crashes here, another lazy recovery will insert the below
196+
// `lazy-recovery-abort-with-full-id` later)
197+
// - `lazy-recovery-abort-with-full-id` succeeds
198+
// - The transaction is treated as aborted because of `lazy-recovery-abort-with-full-id`
199+
//
200+
// C. The original commit with `tx_id: <full tx ID>` is done in case #b, and then lazy recovery
201+
// happens
202+
// - The original commit with `tx_id: <full tx ID>` succeeds
203+
// - `lazy-recovery-abort-with-parent-id` succeeds
204+
// - `lazy-recovery-abort-with-full-id` fails
205+
// - The transaction is treated as committed since the commit `tx_id` is the transaction full
206+
// ID
207+
//
208+
// D. The original commit with `tx_id: <full tx ID>` is in-progress in case #b, and lazy
209+
// recovery happens first
210+
// - `lazy-recovery-abort-with-parent-id` succeeds
211+
// - (If the lazy recovery crashes here and the original commit happens, the situation will be
212+
// the same as C)
213+
// - `lazy-recovery-abort-with-full-id` succeeds
214+
// - The original commit with `tx_id: <full tx ID>` fails
215+
// - The transaction is treated as aborted because of `lazy-recovery-abort-with-full-id`
216+
Keys<String, String, String> keys = keyManipulator.keysFromFullKey(id);
217+
try {
218+
// This record is to prevent a group commit that has the same parent ID considering case #a
219+
// regardless if the transaction is actually in a group commit (case #a) or a delayed commit
220+
// (case #b).
221+
putStateForGroupCommit(
222+
keys.parentKey,
223+
Collections.emptyList(),
224+
TransactionState.ABORTED,
225+
System.currentTimeMillis());
226+
} catch (CoordinatorConflictException e) {
227+
// The group commit finished already, although there may be ongoing delayed groups.
228+
229+
// If the group commit contains the transaction, follow the state.
230+
// Otherwise, continue to insert a record with the full ID.
231+
Optional<State> optState = getState(keys.parentKey);
232+
if (!optState.isPresent()) {
233+
throw new AssertionError();
234+
}
235+
State state = optState.get();
236+
if (state.getChildIds().contains(keys.childKey)) {
237+
if (state.getState() == TransactionState.ABORTED) {
238+
return;
239+
} else {
240+
// Conflicted.
241+
throw e;
242+
}
243+
}
244+
}
245+
// This record is to intend the transaction is aborted.
246+
putState(new Coordinator.State(id, TransactionState.ABORTED));
247+
}
248+
149249
private Get createGetWith(String id) {
150250
return new Get(new Key(Attribute.toIdValue(id)))
151251
.withConsistency(Consistency.LINEARIZABLE)

core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private void abortIfExpired(Selection selection, TransactionResult result) {
9999
}
100100

101101
try {
102-
coordinator.putState(new Coordinator.State(result.getId(), TransactionState.ABORTED));
102+
coordinator.putStateForLazyRecoveryRollback(result.getId());
103103
rollbackRecord(selection, result);
104104
} catch (CoordinatorException e) {
105105
logger.warn("Coordinator tries to abort {}, but failed", result.getId(), e);

core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorTest.java

+186
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.assertj.core.api.Assertions.assertThatThrownBy;
55
import static org.mockito.ArgumentMatchers.any;
6+
import static org.mockito.ArgumentMatchers.anyList;
7+
import static org.mockito.ArgumentMatchers.anyLong;
8+
import static org.mockito.ArgumentMatchers.anyString;
9+
import static org.mockito.ArgumentMatchers.eq;
610
import static org.mockito.Mockito.doNothing;
711
import static org.mockito.Mockito.doReturn;
812
import static org.mockito.Mockito.doThrow;
913
import static org.mockito.Mockito.mock;
14+
import static org.mockito.Mockito.never;
1015
import static org.mockito.Mockito.spy;
1116
import static org.mockito.Mockito.times;
1217
import static org.mockito.Mockito.verify;
@@ -672,4 +677,185 @@ public void putStateForGroupCommit_FullIdGiven_ShouldThrowAssertionError(
672677
parentId, fullIds, transactionState, current))
673678
.isInstanceOf(IllegalArgumentException.class);
674679
}
680+
681+
@Test
682+
void putStateForLazyRecoveryRollback_NormalIdGiven_ShouldCallPutState()
683+
throws CoordinatorException {
684+
// Arrange
685+
Coordinator spiedCoordinator = spy(coordinator);
686+
687+
// Act
688+
spiedCoordinator.putStateForLazyRecoveryRollback(ANY_ID_1);
689+
690+
// Assert
691+
verify(spiedCoordinator).putState(new State(ANY_ID_1, TransactionState.ABORTED));
692+
}
693+
694+
@Test
695+
void
696+
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInGroupCommitWhenGroupCommitIsNotCommitted_ShouldInsertTwoRecordsWithParentIdAndFullId()
697+
throws CoordinatorException {
698+
// Arrange
699+
Coordinator spiedCoordinator = spy(coordinator);
700+
CoordinatorGroupCommitKeyManipulator keyManipulator =
701+
new CoordinatorGroupCommitKeyManipulator();
702+
String parentId = keyManipulator.generateParentKey();
703+
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);
704+
705+
// Act
706+
spiedCoordinator.putStateForLazyRecoveryRollback(fullId);
707+
708+
// Assert
709+
verify(spiedCoordinator)
710+
.putStateForGroupCommit(
711+
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
712+
verify(spiedCoordinator).putState(new State(fullId, TransactionState.ABORTED));
713+
}
714+
715+
@Test
716+
void
717+
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInGroupCommitWhenGroupCommitIsCommitted_ShouldThrowCoordinatorConflictException()
718+
throws CoordinatorException {
719+
// Arrange
720+
Coordinator spiedCoordinator = spy(coordinator);
721+
CoordinatorGroupCommitKeyManipulator keyManipulator =
722+
new CoordinatorGroupCommitKeyManipulator();
723+
String parentId = keyManipulator.generateParentKey();
724+
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);
725+
726+
doThrow(CoordinatorConflictException.class)
727+
.when(spiedCoordinator)
728+
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
729+
doReturn(
730+
Optional.of(
731+
new State(
732+
parentId,
733+
Collections.singletonList(ANY_ID_1),
734+
TransactionState.COMMITTED,
735+
System.currentTimeMillis())))
736+
.when(spiedCoordinator)
737+
.getState(parentId);
738+
739+
// Act
740+
assertThatThrownBy(() -> spiedCoordinator.putStateForLazyRecoveryRollback(fullId))
741+
.isInstanceOf(CoordinatorConflictException.class);
742+
743+
// Assert
744+
verify(spiedCoordinator)
745+
.putStateForGroupCommit(
746+
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
747+
verify(spiedCoordinator, never()).putState(new State(fullId, TransactionState.ABORTED));
748+
}
749+
750+
@Test
751+
void
752+
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInGroupCommitWhenGroupCommitIsAbort_ShouldDoNothing()
753+
throws CoordinatorException {
754+
// Arrange
755+
Coordinator spiedCoordinator = spy(coordinator);
756+
CoordinatorGroupCommitKeyManipulator keyManipulator =
757+
new CoordinatorGroupCommitKeyManipulator();
758+
String parentId = keyManipulator.generateParentKey();
759+
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);
760+
761+
doThrow(CoordinatorConflictException.class)
762+
.when(spiedCoordinator)
763+
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
764+
doReturn(
765+
Optional.of(
766+
new State(
767+
parentId,
768+
Collections.singletonList(ANY_ID_1),
769+
TransactionState.ABORTED,
770+
System.currentTimeMillis())))
771+
.when(spiedCoordinator)
772+
.getState(parentId);
773+
774+
// Act
775+
spiedCoordinator.putStateForLazyRecoveryRollback(fullId);
776+
777+
// Assert
778+
verify(spiedCoordinator)
779+
.putStateForGroupCommit(
780+
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
781+
verify(spiedCoordinator, never()).putState(new State(fullId, TransactionState.ABORTED));
782+
}
783+
784+
@ParameterizedTest
785+
@EnumSource(
786+
value = TransactionState.class,
787+
names = {"COMMITTED", "ABORTED"})
788+
void
789+
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInDelayedGroupCommitWhenGroupCommitFinished_ShouldInsertRecordWithFullId(
790+
TransactionState transactionState) throws CoordinatorException {
791+
// Arrange
792+
Coordinator spiedCoordinator = spy(coordinator);
793+
CoordinatorGroupCommitKeyManipulator keyManipulator =
794+
new CoordinatorGroupCommitKeyManipulator();
795+
String parentId = keyManipulator.generateParentKey();
796+
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);
797+
798+
doThrow(CoordinatorConflictException.class)
799+
.when(spiedCoordinator)
800+
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
801+
doReturn(
802+
Optional.of(
803+
new State(
804+
parentId,
805+
Collections.singletonList("other-id"),
806+
transactionState,
807+
System.currentTimeMillis())))
808+
.when(spiedCoordinator)
809+
.getState(parentId);
810+
811+
// Act
812+
spiedCoordinator.putStateForLazyRecoveryRollback(fullId);
813+
814+
// Assert
815+
verify(spiedCoordinator)
816+
.putStateForGroupCommit(
817+
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
818+
verify(spiedCoordinator).putState(new State(fullId, TransactionState.ABORTED));
819+
}
820+
821+
@ParameterizedTest
822+
@EnumSource(
823+
value = TransactionState.class,
824+
names = {"COMMITTED", "ABORTED"})
825+
void
826+
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInDelayedGroupCommitWhenGroupCommitAndDelayedGroupCommitFinished_ShouldCoordinatorConflictException(
827+
TransactionState transactionState) throws CoordinatorException {
828+
// Arrange
829+
Coordinator spiedCoordinator = spy(coordinator);
830+
CoordinatorGroupCommitKeyManipulator keyManipulator =
831+
new CoordinatorGroupCommitKeyManipulator();
832+
String parentId = keyManipulator.generateParentKey();
833+
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);
834+
835+
doThrow(CoordinatorConflictException.class)
836+
.when(spiedCoordinator)
837+
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
838+
doReturn(
839+
Optional.of(
840+
new State(
841+
parentId,
842+
Collections.singletonList("other-id"),
843+
transactionState,
844+
System.currentTimeMillis())))
845+
.when(spiedCoordinator)
846+
.getState(parentId);
847+
doThrow(CoordinatorConflictException.class)
848+
.when(spiedCoordinator)
849+
.putState(new State(fullId, TransactionState.ABORTED));
850+
851+
// Act
852+
assertThatThrownBy(() -> spiedCoordinator.putStateForLazyRecoveryRollback(fullId))
853+
.isInstanceOf(CoordinatorConflictException.class);
854+
855+
// Assert
856+
verify(spiedCoordinator)
857+
.putStateForGroupCommit(
858+
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
859+
verify(spiedCoordinator).putState(new State(fullId, TransactionState.ABORTED));
860+
}
675861
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/RecoveryHandlerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void recover_SelectionAndResultGivenWhenCoordinatorStateNotExistsAndExpir
134134
handler.recover(selection, result);
135135

136136
// Assert
137-
verify(coordinator).putState(new Coordinator.State(ANY_ID_1, TransactionState.ABORTED));
137+
verify(coordinator).putStateForLazyRecoveryRollback(ANY_ID_1);
138138
verify(handler).rollbackRecord(selection, result);
139139
}
140140
}

0 commit comments

Comments
 (0)