Put Federation on async executor

This commit is contained in:
Tim Zöller 2026-04-07 19:29:43 +02:00
parent 662363555b
commit fde80672f2
2 changed files with 131 additions and 31 deletions

View file

@ -2,9 +2,12 @@ package net.javahippie.fitpub;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.core5.util.Timeout;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -32,15 +35,38 @@ public class FitPubApplication {
/** /**
* REST template for making HTTP requests to remote ActivityPub servers. * REST template for making HTTP requests to remote ActivityPub servers.
*
* <p>Configures explicit connect/socket/response timeouts. Without these, a slow or
* unresponsive remote inbox can hang the calling thread indefinitely (HttpClient's
* default is no timeout). Federation deliveries run on the request thread for some
* outbound activities, so a hung remote would otherwise block the user's HTTP
* response. Values chosen to be generous enough for healthy peers but bounded
* enough that one bad peer can't stall a request beyond a few seconds.
*/ */
@Bean @Bean
public RestTemplate restTemplate() { public RestTemplate restTemplate() {
// Use Apache HttpClient with custom configuration // Connection-level timeouts: how long to wait for the TCP / TLS handshake
// and how long to wait for data on an established socket.
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setConnectTimeout(Timeout.ofSeconds(5))
.setSocketTimeout(Timeout.ofSeconds(10))
.build();
HttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create() HttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setDefaultConnectionConfig(connectionConfig)
.build();
// Request-level timeouts: how long to wait for a connection from the pool,
// how long to wait for the first response byte, and the overall connect cap.
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(Timeout.ofSeconds(5))
.setConnectionRequestTimeout(Timeout.ofSeconds(2))
.setResponseTimeout(Timeout.ofSeconds(10))
.build(); .build();
HttpClient httpClient = HttpClientBuilder.create() HttpClient httpClient = HttpClientBuilder.create()
.setConnectionManager(connectionManager) .setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.disableRedirectHandling() // Don't follow redirects (important for federation) .disableRedirectHandling() // Don't follow redirects (important for federation)
.build(); .build();

View file

@ -14,7 +14,10 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity; import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
@ -40,7 +43,21 @@ public class FederationService {
private final UserRepository userRepository; private final UserRepository userRepository;
private final HttpSignatureValidator signatureValidator; private final HttpSignatureValidator signatureValidator;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final RestTemplate restTemplate = new RestTemplate(); // Injected via constructor (Lombok @RequiredArgsConstructor) so federation HTTP calls
// go through the application-wide RestTemplate bean which carries connect/socket
// /response timeouts. Previously this was `new RestTemplate()` with no timeouts at all.
private final RestTemplate restTemplate;
/**
* Self-reference for invoking {@code @Async} methods on this service from other
* methods in the same class. Spring's AOP proxy is bypassed when you call
* {@code this.someAsyncMethod()} directly, so any {@code @Async} call from inside
* the class must go through this proxy reference. {@code @Lazy} avoids the
* circular constructor injection that would otherwise happen.
*/
@Autowired
@Lazy
private FederationService self;
@Value("${fitpub.base-url}") @Value("${fitpub.base-url}")
private String baseUrl; private String baseUrl;
@ -120,21 +137,64 @@ public class FederationService {
} }
/** /**
* Send a Follow activity to a remote actor. * Record a follow request to a remote actor.
*
* <p>Synchronously: validates the remote actor exists (via {@link #fetchRemoteActor},
* which is cached so subsequent calls are fast), then writes a local PENDING
* {@code Follow} row with a freshly-generated activity ID.
*
* <p>Asynchronously: hands off the actual HTTP-signed Follow delivery to
* {@link #deliverFollowActivityAsync} on the {@code taskExecutor} thread pool.
* The user's HTTP response no longer waits for the federated server to ack
* if the remote is briefly unreachable the local PENDING row remains and a
* future delivery retry could pick it up.
*
* <p>This is a behaviour change vs. the previous version, which sent the HTTP
* activity first and only saved the local row on success. The new ordering means
* a follow attempt to a temporarily unreachable server is preserved locally
* instead of failing the user's request entirely.
* *
* @param remoteActorUri the URI of the remote actor to follow * @param remoteActorUri the URI of the remote actor to follow
* @param localUser the local user initiating the follow * @param localUser the local user initiating the follow
*/ */
@Transactional @Transactional
public void sendFollowActivity(String remoteActorUri, User localUser) { public void sendFollowActivity(String remoteActorUri, User localUser) {
log.info("Recording Follow request from {} to {}", localUser.getUsername(), remoteActorUri);
// 1. Validate remote actor exists. If the URI is bogus or the remote is hard
// unreachable on a cold cache, this throws and the controller surfaces it
// to the user. Cached lookups make this near-instant for re-follows.
RemoteActor remoteActor = fetchRemoteActor(remoteActorUri);
// 2. Save the local PENDING follow row immediately so the user's intent is
// persisted even if HTTP delivery later fails. The status flips to ACCEPTED
// when we receive the Accept activity in the inbox.
String followId = baseUrl + "/activities/follow/" + UUID.randomUUID();
Follow follow = Follow.builder()
.followerId(localUser.getId())
.followingActorUri(remoteActorUri)
.status(Follow.FollowStatus.PENDING)
.activityId(followId)
.build();
followRepository.save(follow);
// 3. Fire the actual HTTP delivery off the request thread. Errors inside
// the async helper are logged but cannot affect the local row.
self.deliverFollowActivityAsync(followId, remoteActorUri, remoteActor.getInboxUrl(), localUser);
}
/**
* Background helper for {@link #sendFollowActivity}: builds the ActivityPub
* Follow envelope and POSTs it to the remote actor's inbox. Runs on the
* {@code taskExecutor} thread pool. Failures are logged and swallowed; the
* caller has already returned by the time this runs.
*
* <p>Must be invoked through the {@link #self} proxy reference (not via
* {@code this}) so the {@code @Async} aspect actually applies.
*/
@Async("taskExecutor")
public void deliverFollowActivityAsync(String followId, String remoteActorUri, String inboxUrl, User localUser) {
try { try {
log.info("Sending Follow activity from {} to {}", localUser.getUsername(), remoteActorUri);
// 1. Fetch remote actor to get inbox URL and cache their info
RemoteActor remoteActor = fetchRemoteActor(remoteActorUri);
// 2. Create Follow activity
String followId = baseUrl + "/activities/follow/" + UUID.randomUUID();
String actorUri = baseUrl + "/users/" + localUser.getUsername(); String actorUri = baseUrl + "/users/" + localUser.getUsername();
Map<String, Object> followActivity = new HashMap<>(); Map<String, Object> followActivity = new HashMap<>();
@ -145,34 +205,24 @@ public class FederationService {
followActivity.put("object", remoteActorUri); followActivity.put("object", remoteActorUri);
followActivity.put("published", Instant.now().toString()); followActivity.put("published", Instant.now().toString());
// 3. Send to remote actor's inbox (HTTP-signed) sendActivity(inboxUrl, followActivity, localUser);
sendActivity(remoteActor.getInboxUrl(), followActivity, localUser); log.info("Follow activity delivered: {} -> {}", localUser.getUsername(), remoteActorUri);
// 4. Create local follow record with PENDING status
// The status will be updated to ACCEPTED when we receive an Accept activity
Follow follow = Follow.builder()
.followerId(localUser.getId())
.followingActorUri(remoteActorUri)
.status(Follow.FollowStatus.PENDING)
.activityId(followId)
.build();
followRepository.save(follow);
log.info("Follow activity sent successfully: {} -> {}", localUser.getUsername(), remoteActorUri);
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to send Follow activity from {} to {}", localUser.getUsername(), remoteActorUri, e); log.error("Failed to deliver Follow activity from {} to {}", localUser.getUsername(), remoteActorUri, e);
throw new RuntimeException("Failed to send Follow activity", e);
} }
} }
/** /**
* Send an Accept activity in response to a Follow. * Send an Accept activity in response to a Follow.
* *
* <p>Runs on the {@code taskExecutor} pool. The inbox handler that triggers this
* needs to ack the federated sender with 202 quickly; we don't want the ack to
* wait on another HTTP roundtrip back to the sender's inbox.
*
* @param follow the follow relationship * @param follow the follow relationship
* @param localUser the local user being followed * @param localUser the local user being followed
*/ */
@Transactional @Async("taskExecutor")
public void sendAcceptActivity(Follow follow, User localUser) { public void sendAcceptActivity(Follow follow, User localUser) {
try { try {
// Get the remote actor who sent the follow request // Get the remote actor who sent the follow request
@ -293,12 +343,19 @@ public class FederationService {
/** /**
* Send a Create activity for a new post/object. * Send a Create activity for a new post/object.
* *
* <p>Like the other federation send methods, this runs on the {@code taskExecutor}
* thread pool so that user-facing actions (e.g. posting a comment) don't block on
* federation HTTP delivery. The activity-publish path already wraps this in
* {@link net.javahippie.fitpub.service.ActivityPostProcessingService#publishToFederationAsync};
* the additional {@code @Async} here mainly benefits the comment path, which used
* to call this synchronously on the request thread.
*
* @param objectId the ID of the created object * @param objectId the ID of the created object
* @param object the object being created (activity, note, etc.) * @param object the object being created (activity, note, etc.)
* @param sender the local user creating the object * @param sender the local user creating the object
* @param toPublic whether to send to public (CC followers) * @param toPublic whether to send to public (CC followers)
*/ */
@Transactional @Async("taskExecutor")
public void sendCreateActivity(String objectId, Map<String, Object> object, User sender, boolean toPublic) { public void sendCreateActivity(String objectId, Map<String, Object> object, User sender, boolean toPublic) {
try { try {
String createId = baseUrl + "/activities/create/" + UUID.randomUUID(); String createId = baseUrl + "/activities/create/" + UUID.randomUUID();
@ -341,11 +398,19 @@ public class FederationService {
* Mastodon ignores the content and shows it as a regular like graceful * Mastodon ignores the content and shows it as a regular like graceful
* degradation in both directions. * degradation in both directions.
* *
* <p><strong>Runs on the {@code taskExecutor} thread pool</strong>: federation
* delivery requires HTTP calls to every follower's inbox (with TLS handshake +
* RSA signing per delivery). Doing that on the request thread used to add ~1.3 s
* to a single reaction click on prod. Fire-and-forget here means the user gets
* their HTTP response as soon as the local DB writes commit; federation runs in
* the background. Errors are logged in {@link #sendActivity} but never propagated
* to the caller.
*
* @param objectUri the URI of the object being liked * @param objectUri the URI of the object being liked
* @param sender the local user reacting to the object * @param sender the local user reacting to the object
* @param emoji the reaction emoji (must be from {@link net.javahippie.fitpub.model.ReactionEmoji#PALETTE}) * @param emoji the reaction emoji (must be from {@link net.javahippie.fitpub.model.ReactionEmoji#PALETTE})
*/ */
@Transactional @Async("taskExecutor")
public void sendLikeActivity(String objectUri, User sender, String emoji) { public void sendLikeActivity(String objectUri, User sender, String emoji) {
try { try {
String likeId = baseUrl + "/activities/like/" + UUID.randomUUID(); String likeId = baseUrl + "/activities/like/" + UUID.randomUUID();
@ -377,10 +442,15 @@ public class FederationService {
* Send Undo Follow activity to remote actor's inbox. * Send Undo Follow activity to remote actor's inbox.
* This notifies the remote server that we're unfollowing them. * This notifies the remote server that we're unfollowing them.
* *
* <p>Runs on the {@code taskExecutor} pool. The unfollow controller continues
* with the local follow row deletion regardless of whether this delivery
* succeeds, so the user's HTTP response shouldn't wait on it.
*
* @param remoteActorUri the actor URI being unfollowed * @param remoteActorUri the actor URI being unfollowed
* @param localUser the local user who is unfollowing * @param localUser the local user who is unfollowing
* @param originalFollowActivityId the ID of the original Follow activity * @param originalFollowActivityId the ID of the original Follow activity
*/ */
@Async("taskExecutor")
public void sendUndoFollowActivity(String remoteActorUri, User localUser, String originalFollowActivityId) { public void sendUndoFollowActivity(String remoteActorUri, User localUser, String originalFollowActivityId) {
try { try {
log.info("Sending Undo Follow activity from {} to {}", localUser.getUsername(), remoteActorUri); log.info("Sending Undo Follow activity from {} to {}", localUser.getUsername(), remoteActorUri);
@ -421,11 +491,15 @@ public class FederationService {
/** /**
* Send an Undo activity (for unlike, unfollow, etc.). * Send an Undo activity (for unlike, unfollow, etc.).
* *
* <p>Like {@link #sendLikeActivity}, this is fire-and-forget on the
* {@code taskExecutor} thread pool: the unlike HTTP response shouldn't wait for
* federation HTTP calls.
*
* @param originalActivityId the ID of the activity being undone * @param originalActivityId the ID of the activity being undone
* @param originalActivity the original activity being undone * @param originalActivity the original activity being undone
* @param sender the local user undoing the activity * @param sender the local user undoing the activity
*/ */
@Transactional @Async("taskExecutor")
public void sendUndoActivity(String originalActivityId, Map<String, Object> originalActivity, User sender) { public void sendUndoActivity(String originalActivityId, Map<String, Object> originalActivity, User sender) {
try { try {
String undoId = baseUrl + "/activities/undo/" + UUID.randomUUID(); String undoId = baseUrl + "/activities/undo/" + UUID.randomUUID();