fix(analytics): serialize delete recalculations per user
Prevent concurrent achievement and summary rebuilds for the same user when multiple activities are deleted in quick succession. Signed-off-by: Marcus Fihlon <marcus@fihlon.swiss>
This commit is contained in:
parent
2ae0eeb06b
commit
714007aabe
2 changed files with 86 additions and 5 deletions
|
|
@ -7,6 +7,13 @@ import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.event.TransactionPhase;
|
import org.springframework.transaction.event.TransactionPhase;
|
||||||
import org.springframework.transaction.event.TransactionalEventListener;
|
import org.springframework.transaction.event.TransactionalEventListener;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recalculates derived analytics after an activity has been deleted.
|
* Recalculates derived analytics after an activity has been deleted.
|
||||||
*/
|
*/
|
||||||
|
|
@ -17,14 +24,59 @@ public class ActivityDeleteRecalculationService {
|
||||||
|
|
||||||
private final AchievementService achievementService;
|
private final AchievementService achievementService;
|
||||||
private final ActivitySummaryService activitySummaryService;
|
private final ActivitySummaryService activitySummaryService;
|
||||||
|
private final ConcurrentMap<UUID, PendingUserRecalculation> pendingRecalculations = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Async
|
@Async
|
||||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
|
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
|
||||||
public void handleActivityDeleted(ActivityDeletedEvent event) {
|
public void handleActivityDeleted(ActivityDeletedEvent event) {
|
||||||
|
PendingUserRecalculation pending = pendingRecalculations.computeIfAbsent(
|
||||||
|
event.userId(),
|
||||||
|
ignored -> new PendingUserRecalculation()
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!pending.enqueueAndShouldStart(event.activityDate())) {
|
||||||
|
log.debug("Queued additional activity delete recalculation for user {}", event.userId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
Set<LocalDate> datesToRecalculate = pending.drainDates();
|
||||||
achievementService.rebuildAchievementsForUser(event.userId());
|
achievementService.rebuildAchievementsForUser(event.userId());
|
||||||
activitySummaryService.updateWeeklySummary(event.userId(), event.activityDate());
|
for (LocalDate date : datesToRecalculate) {
|
||||||
activitySummaryService.updateMonthlySummary(event.userId(), event.activityDate());
|
activitySummaryService.updateWeeklySummary(event.userId(), date);
|
||||||
activitySummaryService.updateYearlySummary(event.userId(), event.activityDate());
|
activitySummaryService.updateMonthlySummary(event.userId(), date);
|
||||||
log.info("Recalculated achievements and summaries after deleting activity for user {}", event.userId());
|
activitySummaryService.updateYearlySummary(event.userId(), date);
|
||||||
|
}
|
||||||
|
} while (pending.keepProcessing());
|
||||||
|
|
||||||
|
log.info("Recalculated achievements and summaries after deleting activities for user {}", event.userId());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class PendingUserRecalculation {
|
||||||
|
private final Set<LocalDate> pendingDates = new HashSet<>();
|
||||||
|
private boolean processing;
|
||||||
|
|
||||||
|
synchronized boolean enqueueAndShouldStart(LocalDate activityDate) {
|
||||||
|
pendingDates.add(activityDate);
|
||||||
|
if (processing) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
processing = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized Set<LocalDate> drainDates() {
|
||||||
|
Set<LocalDate> dates = new HashSet<>(pendingDates);
|
||||||
|
pendingDates.clear();
|
||||||
|
return dates;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized boolean keepProcessing() {
|
||||||
|
if (pendingDates.isEmpty()) {
|
||||||
|
processing = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
|
@ -37,4 +40,30 @@ class ActivityDeleteRecalculationServiceTest {
|
||||||
verify(activitySummaryService).updateMonthlySummary(userId, activityDate);
|
verify(activitySummaryService).updateMonthlySummary(userId, activityDate);
|
||||||
verify(activitySummaryService).updateYearlySummary(userId, activityDate);
|
verify(activitySummaryService).updateYearlySummary(userId, activityDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("Should serialize recalculations per user and replay queued deletions")
|
||||||
|
void shouldSerializeRecalculationsPerUserAndReplayQueuedDeletions() {
|
||||||
|
UUID userId = UUID.randomUUID();
|
||||||
|
LocalDate firstDate = LocalDate.of(2025, 12, 3);
|
||||||
|
LocalDate secondDate = LocalDate.of(2025, 12, 4);
|
||||||
|
AtomicBoolean queuedSecondDelete = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
if (queuedSecondDelete.compareAndSet(false, true)) {
|
||||||
|
activityDeleteRecalculationService.handleActivityDeleted(new ActivityDeletedEvent(userId, secondDate));
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}).when(achievementService).rebuildAchievementsForUser(userId);
|
||||||
|
|
||||||
|
activityDeleteRecalculationService.handleActivityDeleted(new ActivityDeletedEvent(userId, firstDate));
|
||||||
|
|
||||||
|
verify(achievementService, times(2)).rebuildAchievementsForUser(userId);
|
||||||
|
verify(activitySummaryService).updateWeeklySummary(userId, firstDate);
|
||||||
|
verify(activitySummaryService).updateMonthlySummary(userId, firstDate);
|
||||||
|
verify(activitySummaryService).updateYearlySummary(userId, firstDate);
|
||||||
|
verify(activitySummaryService).updateWeeklySummary(userId, secondDate);
|
||||||
|
verify(activitySummaryService).updateMonthlySummary(userId, secondDate);
|
||||||
|
verify(activitySummaryService).updateYearlySummary(userId, secondDate);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue