From fde80672f25b65e26da9888c39d7f234a7462922 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Z=C3=B6ller?= Date: Tue, 7 Apr 2026 19:29:43 +0200 Subject: [PATCH] Put Federation on async executor --- .../javahippie/fitpub/FitPubApplication.java | 28 +++- .../fitpub/service/FederationService.java | 134 ++++++++++++++---- 2 files changed, 131 insertions(+), 31 deletions(-) diff --git a/src/main/java/net/javahippie/fitpub/FitPubApplication.java b/src/main/java/net/javahippie/fitpub/FitPubApplication.java index 8f4b0e3..79756a4 100644 --- a/src/main/java/net/javahippie/fitpub/FitPubApplication.java +++ b/src/main/java/net/javahippie/fitpub/FitPubApplication.java @@ -2,9 +2,12 @@ package net.javahippie.fitpub; import lombok.extern.slf4j.Slf4j; import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.hc.core5.util.Timeout; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -32,15 +35,38 @@ public class FitPubApplication { /** * REST template for making HTTP requests to remote ActivityPub servers. + * + *

Configures explicit connect/socket/response timeouts. Without these, a slow or + * unresponsive remote inbox can hang the calling thread indefinitely (HttpClient's + * default is no timeout). Federation deliveries run on the request thread for some + * outbound activities, so a hung remote would otherwise block the user's HTTP + * response. Values chosen to be generous enough for healthy peers but bounded + * enough that one bad peer can't stall a request beyond a few seconds. */ @Bean public RestTemplate restTemplate() { - // Use Apache HttpClient with custom configuration + // Connection-level timeouts: how long to wait for the TCP / TLS handshake + // and how long to wait for data on an established socket. + ConnectionConfig connectionConfig = ConnectionConfig.custom() + .setConnectTimeout(Timeout.ofSeconds(5)) + .setSocketTimeout(Timeout.ofSeconds(10)) + .build(); + HttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create() + .setDefaultConnectionConfig(connectionConfig) + .build(); + + // Request-level timeouts: how long to wait for a connection from the pool, + // how long to wait for the first response byte, and the overall connect cap. + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(Timeout.ofSeconds(5)) + .setConnectionRequestTimeout(Timeout.ofSeconds(2)) + .setResponseTimeout(Timeout.ofSeconds(10)) .build(); HttpClient httpClient = HttpClientBuilder.create() .setConnectionManager(connectionManager) + .setDefaultRequestConfig(requestConfig) .disableRedirectHandling() // Don't follow redirects (important for federation) .build(); diff --git a/src/main/java/net/javahippie/fitpub/service/FederationService.java b/src/main/java/net/javahippie/fitpub/service/FederationService.java index c6c797f..ff41cf4 100644 --- a/src/main/java/net/javahippie/fitpub/service/FederationService.java +++ b/src/main/java/net/javahippie/fitpub/service/FederationService.java @@ -14,7 +14,10 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.client.RestTemplate; @@ -40,7 +43,21 @@ public class FederationService { private final UserRepository userRepository; private final HttpSignatureValidator signatureValidator; private final ObjectMapper objectMapper; - private final RestTemplate restTemplate = new RestTemplate(); + // Injected via constructor (Lombok @RequiredArgsConstructor) so federation HTTP calls + // go through the application-wide RestTemplate bean — which carries connect/socket + // /response timeouts. Previously this was `new RestTemplate()` with no timeouts at all. + private final RestTemplate restTemplate; + + /** + * Self-reference for invoking {@code @Async} methods on this service from other + * methods in the same class. Spring's AOP proxy is bypassed when you call + * {@code this.someAsyncMethod()} directly, so any {@code @Async} call from inside + * the class must go through this proxy reference. {@code @Lazy} avoids the + * circular constructor injection that would otherwise happen. + */ + @Autowired + @Lazy + private FederationService self; @Value("${fitpub.base-url}") private String baseUrl; @@ -120,21 +137,64 @@ public class FederationService { } /** - * Send a Follow activity to a remote actor. + * Record a follow request to a remote actor. + * + *

Synchronously: validates the remote actor exists (via {@link #fetchRemoteActor}, + * which is cached so subsequent calls are fast), then writes a local PENDING + * {@code Follow} row with a freshly-generated activity ID. + * + *

Asynchronously: hands off the actual HTTP-signed Follow delivery to + * {@link #deliverFollowActivityAsync} on the {@code taskExecutor} thread pool. + * The user's HTTP response no longer waits for the federated server to ack — + * if the remote is briefly unreachable the local PENDING row remains and a + * future delivery retry could pick it up. + * + *

This is a behaviour change vs. the previous version, which sent the HTTP + * activity first and only saved the local row on success. The new ordering means + * a follow attempt to a temporarily unreachable server is preserved locally + * instead of failing the user's request entirely. * * @param remoteActorUri the URI of the remote actor to follow * @param localUser the local user initiating the follow */ @Transactional public void sendFollowActivity(String remoteActorUri, User localUser) { + log.info("Recording Follow request from {} to {}", localUser.getUsername(), remoteActorUri); + + // 1. Validate remote actor exists. If the URI is bogus or the remote is hard + // unreachable on a cold cache, this throws and the controller surfaces it + // to the user. Cached lookups make this near-instant for re-follows. + RemoteActor remoteActor = fetchRemoteActor(remoteActorUri); + + // 2. Save the local PENDING follow row immediately so the user's intent is + // persisted even if HTTP delivery later fails. The status flips to ACCEPTED + // when we receive the Accept activity in the inbox. + String followId = baseUrl + "/activities/follow/" + UUID.randomUUID(); + Follow follow = Follow.builder() + .followerId(localUser.getId()) + .followingActorUri(remoteActorUri) + .status(Follow.FollowStatus.PENDING) + .activityId(followId) + .build(); + followRepository.save(follow); + + // 3. Fire the actual HTTP delivery off the request thread. Errors inside + // the async helper are logged but cannot affect the local row. + self.deliverFollowActivityAsync(followId, remoteActorUri, remoteActor.getInboxUrl(), localUser); + } + + /** + * Background helper for {@link #sendFollowActivity}: builds the ActivityPub + * Follow envelope and POSTs it to the remote actor's inbox. Runs on the + * {@code taskExecutor} thread pool. Failures are logged and swallowed; the + * caller has already returned by the time this runs. + * + *

Must be invoked through the {@link #self} proxy reference (not via + * {@code this}) so the {@code @Async} aspect actually applies. + */ + @Async("taskExecutor") + public void deliverFollowActivityAsync(String followId, String remoteActorUri, String inboxUrl, User localUser) { try { - log.info("Sending Follow activity from {} to {}", localUser.getUsername(), remoteActorUri); - - // 1. Fetch remote actor to get inbox URL and cache their info - RemoteActor remoteActor = fetchRemoteActor(remoteActorUri); - - // 2. Create Follow activity - String followId = baseUrl + "/activities/follow/" + UUID.randomUUID(); String actorUri = baseUrl + "/users/" + localUser.getUsername(); Map followActivity = new HashMap<>(); @@ -145,34 +205,24 @@ public class FederationService { followActivity.put("object", remoteActorUri); followActivity.put("published", Instant.now().toString()); - // 3. Send to remote actor's inbox (HTTP-signed) - sendActivity(remoteActor.getInboxUrl(), followActivity, localUser); - - // 4. Create local follow record with PENDING status - // The status will be updated to ACCEPTED when we receive an Accept activity - Follow follow = Follow.builder() - .followerId(localUser.getId()) - .followingActorUri(remoteActorUri) - .status(Follow.FollowStatus.PENDING) - .activityId(followId) - .build(); - followRepository.save(follow); - - log.info("Follow activity sent successfully: {} -> {}", localUser.getUsername(), remoteActorUri); - + sendActivity(inboxUrl, followActivity, localUser); + log.info("Follow activity delivered: {} -> {}", localUser.getUsername(), remoteActorUri); } catch (Exception e) { - log.error("Failed to send Follow activity from {} to {}", localUser.getUsername(), remoteActorUri, e); - throw new RuntimeException("Failed to send Follow activity", e); + log.error("Failed to deliver Follow activity from {} to {}", localUser.getUsername(), remoteActorUri, e); } } /** * Send an Accept activity in response to a Follow. * + *

Runs on the {@code taskExecutor} pool. The inbox handler that triggers this + * needs to ack the federated sender with 202 quickly; we don't want the ack to + * wait on another HTTP roundtrip back to the sender's inbox. + * * @param follow the follow relationship * @param localUser the local user being followed */ - @Transactional + @Async("taskExecutor") public void sendAcceptActivity(Follow follow, User localUser) { try { // Get the remote actor who sent the follow request @@ -293,12 +343,19 @@ public class FederationService { /** * Send a Create activity for a new post/object. * + *

Like the other federation send methods, this runs on the {@code taskExecutor} + * thread pool so that user-facing actions (e.g. posting a comment) don't block on + * federation HTTP delivery. The activity-publish path already wraps this in + * {@link net.javahippie.fitpub.service.ActivityPostProcessingService#publishToFederationAsync}; + * the additional {@code @Async} here mainly benefits the comment path, which used + * to call this synchronously on the request thread. + * * @param objectId the ID of the created object * @param object the object being created (activity, note, etc.) * @param sender the local user creating the object * @param toPublic whether to send to public (CC followers) */ - @Transactional + @Async("taskExecutor") public void sendCreateActivity(String objectId, Map object, User sender, boolean toPublic) { try { String createId = baseUrl + "/activities/create/" + UUID.randomUUID(); @@ -341,11 +398,19 @@ public class FederationService { * Mastodon ignores the content and shows it as a regular like — graceful * degradation in both directions. * + *

Runs on the {@code taskExecutor} thread pool: federation + * delivery requires HTTP calls to every follower's inbox (with TLS handshake + + * RSA signing per delivery). Doing that on the request thread used to add ~1.3 s + * to a single reaction click on prod. Fire-and-forget here means the user gets + * their HTTP response as soon as the local DB writes commit; federation runs in + * the background. Errors are logged in {@link #sendActivity} but never propagated + * to the caller. + * * @param objectUri the URI of the object being liked * @param sender the local user reacting to the object * @param emoji the reaction emoji (must be from {@link net.javahippie.fitpub.model.ReactionEmoji#PALETTE}) */ - @Transactional + @Async("taskExecutor") public void sendLikeActivity(String objectUri, User sender, String emoji) { try { String likeId = baseUrl + "/activities/like/" + UUID.randomUUID(); @@ -377,10 +442,15 @@ public class FederationService { * Send Undo Follow activity to remote actor's inbox. * This notifies the remote server that we're unfollowing them. * + *

Runs on the {@code taskExecutor} pool. The unfollow controller continues + * with the local follow row deletion regardless of whether this delivery + * succeeds, so the user's HTTP response shouldn't wait on it. + * * @param remoteActorUri the actor URI being unfollowed * @param localUser the local user who is unfollowing * @param originalFollowActivityId the ID of the original Follow activity */ + @Async("taskExecutor") public void sendUndoFollowActivity(String remoteActorUri, User localUser, String originalFollowActivityId) { try { log.info("Sending Undo Follow activity from {} to {}", localUser.getUsername(), remoteActorUri); @@ -421,11 +491,15 @@ public class FederationService { /** * Send an Undo activity (for unlike, unfollow, etc.). * + *

Like {@link #sendLikeActivity}, this is fire-and-forget on the + * {@code taskExecutor} thread pool: the unlike HTTP response shouldn't wait for + * federation HTTP calls. + * * @param originalActivityId the ID of the activity being undone * @param originalActivity the original activity being undone * @param sender the local user undoing the activity */ - @Transactional + @Async("taskExecutor") public void sendUndoActivity(String originalActivityId, Map originalActivity, User sender) { try { String undoId = baseUrl + "/activities/undo/" + UUID.randomUUID();