Concurrencia

☕ Java Moderno & Concurrencia en Spring Boot

Streams · Lambdas · Async · Reactive · Thread-Safety en el contexto real de Spring


Objetivo: Dominar Java moderno y concurrencia aplicados directamente al desarrollo con Spring Boot. Cada concepto va acompañado de su uso real en servicios, controladores, repositorios y configuraciones de Spring.


📌 ÍNDICE

  1. @Async y AsyncConfigurer
  2. CompletableFuture en Spring
  3. Spring WebFlux — Programación Reactiva
  4. Virtual Threads en Spring Boot 3.2+
  5. Transacciones y Concurrencia
  6. Caché, Sesiones y Thread-Safety en Spring
  7. Spring Batch — Procesamiento en Paralelo
  8. Eventos y Mensajería Asíncrona
  9. Problemas Comunes y Cómo Evitarlos

5. @Async y AsyncConfigurer

Configurar el ThreadPool para @Async

// Por defecto, @Async usa SimpleAsyncTaskExecutor (un thread por tarea), que es lento
// Siempre es recomendable configurar un propio ThreadPoolTaskExecutor
 
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
 
    @Override
    @Bean("taskExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 
        // Número de CPUs disponibles
        int cpus = Runtime.getRuntime().availableProcessors();
 
        executor.setCorePoolSize(cpus);          // hilos siempre activos
        executor.setMaxPoolSize(cpus * 2);       // máximo bajo carga
        executor.setQueueCapacity(500);          // cola de tareas en espera
        executor.setKeepAliveSeconds(60);        // tiempo de vida de hilos extras
        executor.setThreadNamePrefix("async-");  // nombre para debugging
        executor.setWaitForTasksToCompleteOnShutdown(true);  // graceful shutdown
        executor.setAwaitTerminationSeconds(30);
        executor.initialize();
        return executor;
    }
 
    // Executor separado para tareas de I/O (más hilos porque bloquean)
    @Bean("ioExecutor")
    public Executor ioExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("io-async-");
        executor.initialize();
        return executor;
    }
 
    // Manejo de excepciones no capturadas en métodos @Async
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> {
            log.error("Error en método async: {}.{}() — {}",
                method.getDeclaringClass().getSimpleName(),
                method.getName(),
                ex.getMessage(), ex);
            // Aquí podrías enviar alerta, guardar en BD, etc.
        };
    }
}

Usar @Async en servicios

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificacionService {
 
    private final EmailSender emailSender;
    private final PushSender pushSender;
 
    // ① @Async sin retorno — "fire and forget"
    @Async("ioExecutor")
    public void enviarEmailBienvenida(String email, String nombre) {
        log.info("Enviando email a {} en hilo: {}", email, Thread.currentThread().getName());
        emailSender.enviar(email, "Bienvenido " + nombre, prepararTemplate(nombre));
    }
 
    // ② @Async con CompletableFuture — permite obtener el resultado
    @Async("ioExecutor")
    public CompletableFuture<Boolean> enviarConConfirmacion(String email) {
        try {
            emailSender.enviar(email, "Confirmación", "...");
            return CompletableFuture.completedFuture(true);
        } catch (Exception ex) {
            log.error("Error enviando email a {}", email, ex);
            return CompletableFuture.completedFuture(false);
        }
    }
 
    // ③ @Async que retorna Future (antiguo, menos recomendado)
    @Async
    public Future<String> procesarConFuture(String datos) {
        String resultado = procesarDatos(datos);
        return new AsyncResult<>(resultado);
    }
}
 
// REGLA IMPORTANTE: @Async solo funciona cuando el método es llamado
// DESDE OTRO BEAN (no desde el mismo bean — ya que Spring usa proxies)
 
@Service
public class PedidoService {
 
    @Autowired
    private NotificacionService notificaciones;  // bean externo ← ✅
 
    @Transactional
    public Pedido crear(CrearPedidoRequest req) {
        Pedido pedido = guardarPedido(req);
 
        // ✅ Llamar al método @Async desde otro bean
        notificaciones.enviarEmailBienvenida(req.email(), req.nombre());
 
        return pedido;
    }
 
    // llamadas internas al mismo bean no funcionan,
    // notificaciones.enviarEmailBienvenida() sería ignorado si estuviera en este mismo bean
}

Ejecutar múltiples tareas @Async en paralelo

@Service
@RequiredArgsConstructor
public class DashboardService {
 
    private final MetricasService metricas;
    private final PedidosService pedidos;
    private final UsuariosService usuarios;
    private final InventarioService inventario;
 
    // Lanzar 4 consultas en paralelo con @Async + CompletableFuture
    public DashboardDTO cargarDashboard() {
        // Cada método es @Async → se ejecutan simultáneamente
        CompletableFuture<MetricasDTO>   metricasF   = metricas.obtenerAsync();
        CompletableFuture<PedidosDTO>    pedidosF    = pedidos.obtenerResumenAsync();
        CompletableFuture<UsuariosDTO>   usuariosF   = usuarios.obtenerEstadisticasAsync();
        CompletableFuture<InventarioDTO> inventarioF = inventario.obtenerAlertasAsync();
 
        // Esperar a todos y combinar
        CompletableFuture.allOf(metricasF, pedidosF, usuariosF, inventarioF).join();
 
        return new DashboardDTO(
            metricasF.join(),
            pedidosF.join(),
            usuariosF.join(),
            inventarioF.join()
        );
    }
}
 
// En los servicios individuales:
@Service
public class MetricasService {
 
    @Async("taskExecutor")
    public CompletableFuture<MetricasDTO> obtenerAsync() {
        MetricasDTO datos = calcularMetricas();  // operación lenta
        return CompletableFuture.completedFuture(datos);
    }
}

6. CompletableFuture en Spring

Patrón: Orquestación de llamadas a microservicios

// Caso de estudio: construir una respuesta combinando datos de varios microservicios
 
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductoDetailService {
 
    private final RestTemplate restTemplate;
    private final WebClient webClient;  // sugerido para async
 
    @Async("ioExecutor")
    public CompletableFuture<PrecioDTO> obtenerPrecio(Long productoId) {
        PrecioDTO precio = webClient.get()
            .uri("/precios/{id}", productoId)
            .retrieve()
            .bodyToMono(PrecioDTO.class)
            .block();  // En @Async es aceptable bloquear (el hilo es del pool async)
        return CompletableFuture.completedFuture(precio);
    }
 
    @Async("ioExecutor")
    public CompletableFuture<InventarioDTO> obtenerInventario(Long productoId) {
        InventarioDTO inv = webClient.get()
            .uri("/inventario/{id}", productoId)
            .retrieve()
            .bodyToMono(InventarioDTO.class)
            .block();
        return CompletableFuture.completedFuture(inv);
    }
 
    @Async("ioExecutor")
    public CompletableFuture<List<ResenaDTO>> obtenerResenas(Long productoId) {
        return CompletableFuture.supplyAsync(() ->
            webClient.get()
                .uri("/resenas/producto/{id}", productoId)
                .retrieve()
                .bodyToFlux(ResenaDTO.class)
                .collectList()
                .block()
        );
    }
 
    // Orquestar las 3 llamadas en paralelo
    public ProductoDetalleDTO obtenerDetalle(Long productoId) {
        Producto producto = productoRepo.findById(productoId)
            .orElseThrow(() -> new EntityNotFoundException("Producto no encontrado"));
 
        // Lanzar en paralelo
        CompletableFuture<PrecioDTO>        precioF    = obtenerPrecio(productoId);
        CompletableFuture<InventarioDTO>    invF       = obtenerInventario(productoId);
        CompletableFuture<List<ResenaDTO>>  resenasF   = obtenerResenas(productoId);
 
        // Timeout + fallback para resiliencia
        CompletableFuture<PrecioDTO> precioSeguro = precioF
            .orTimeout(2, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                log.warn("Servicio de precios no disponible: {}", ex.getMessage());
                return new PrecioDTO(producto.getPrecioBase(), "FALLBACK");
            });
 
        CompletableFuture<List<ResenaDTO>> resenasSeguras = resenasF
            .completeOnTimeout(List.of(), 1, TimeUnit.SECONDS);  // lista vacía si timeout
 
        CompletableFuture.allOf(precioSeguro, invF, resenasSeguras).join();
 
        return new ProductoDetalleDTO(
            producto,
            precioSeguro.join(),
            invF.join(),
            resenasSeguras.join()
        );
    }
}

Manejo de errores en CompletableFuture con Spring

@Service
public class ProcesadorPagosService {
 
    // Pipeline asíncrono con manejo de errores en cada etapa
    public CompletableFuture<ResultadoPago> procesarPago(SolicitudPago solicitud) {
        return CompletableFuture
            .supplyAsync(() -> validarSolicitud(solicitud), ioExecutor)
            .thenApplyAsync(validada -> reservarFondos(validada), ioExecutor)
            .thenApplyAsync(reserva -> ejecutarTransferencia(reserva), ioExecutor)
            .thenApplyAsync(tx -> notificar(tx), ioExecutor)
            .handle((resultado, error) -> {
                if (error != null) {
                    log.error("Error procesando pago: {}", error.getMessage());
                    auditoria.registrarFallo(solicitud, error);
                    return ResultadoPago.fallido(error.getMessage());
                }
                auditoria.registrarExito(solicitud, resultado);
                return resultado;
            });
    }
}

7. Spring WebFlux — Programación Reactiva

¿Cuándo usar WebFlux vs Spring MVC?

Spring MVC (Servlet, Blocking):
✅ Aplicaciones CRUD tradicionales
✅ Equipos con menos experiencia en reactive
✅ JPA/JDBC (no hay drivers reactivos maduros para todo)
✅ La mayoría de aplicaciones empresariales
❌ No escala bien con muchas conexiones concurrentes largas (SSE, WebSocket)

Spring WebFlux (Reactive, Non-Blocking):
✅ Muchas conexiones concurrentes (streaming, tiempo real)
✅ Integración entre microservicios
✅ Con R2DBC (reactive DB driver)
✅ Cuando ya usas Mono/Flux en toda la cadena
❌ Curva de aprendizaje alta
❌ Debugging más difícil
❌ No mezclar con código bloqueante (mata el beneficio)

WebClient — La forma moderna de hacer HTTP en Spring

// WebClient reemplaza a RestTemplate (que está deprecado)
// Funciona en MVC y WebFlux
 
@Configuration
public class WebClientConfig {
 
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .codecs(c -> c.defaultCodecs().maxInMemorySize(1024 * 1024))  // 1MB
            .filter(logRequest())
            .filter(logResponse())
            .build();
    }
 
    private ExchangeFilterFunction logRequest() {
        return ExchangeFilterFunction.ofRequestProcessor(req -> {
            log.debug("{} {}", req.method(), req.url());
            return Mono.just(req);
        });
    }
}
 
// Uso en servicio — modo BLOCKING (dentro de Spring MVC)
@Service
@RequiredArgsConstructor
public class ClienteExternoService {
 
    private final WebClient webClient;
 
    // Llamada bloqueante (aceptable en MVC con @Async)
    @Async("ioExecutor")
    public CompletableFuture<UsuarioExternoDTO> obtenerUsuarioAsync(String userId) {
        UsuarioExternoDTO usuario = webClient.get()
            .uri("/usuarios/{id}", userId)
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError, response ->
                Mono.error(new EntityNotFoundException("Usuario no encontrado: " + userId))
            )
            .onStatus(HttpStatusCode::is5xxServerError, response ->
                Mono.error(new ServiceUnavailableException("Servicio externo no disponible"))
            )
            .bodyToMono(UsuarioExternoDTO.class)
            .timeout(Duration.ofSeconds(5))
            .block();  // bloquea porque estamos en MVC + @Async
        return CompletableFuture.completedFuture(usuario);
    }
 
    // Llamada con retry
    public UsuarioExternoDTO obtenerConRetry(String userId) {
        return webClient.get()
            .uri("/usuarios/{id}", userId)
            .retrieve()
            .bodyToMono(UsuarioExternoDTO.class)
            .retryWhen(Retry.backoff(3, Duration.ofMillis(500))
                .filter(ex -> ex instanceof ServiceUnavailableException))
            .block();
    }
 
    // Obtener lista con transformación
    public List<ProductoExternoDTO> listarProductos(String categoria) {
        return webClient.get()
            .uri(uri -> uri.path("/productos")
                .queryParam("categoria", categoria)
                .build())
            .retrieve()
            .bodyToFlux(ProductoExternoDTO.class)
            .collectList()
            .block();
    }
}

Controlador Reactivo básico

// Spring WebFlux Controller — devuelve Mono/Flux en vez de objetos directos
 
@RestController
@RequestMapping("/api/reactive/productos")
@RequiredArgsConstructor
public class ProductoReactiveController {
 
    private final ProductoReactiveService service;
 
    // Mono — cero o un elemento
    @GetMapping("/{id}")
    public Mono<ResponseEntity<ProductoDTO>> obtener(@PathVariable Long id) {
        return service.buscar(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }
 
    // Flux — cero o muchos elementos
    @GetMapping
    public Flux<ProductoDTO> listar() {
        return service.listarTodos();
    }
 
    // Server-Sent Events — streaming de datos al cliente
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ProductoDTO> stream() {
        return service.listarTodos()
            .delayElements(Duration.ofMillis(100));  // uno por uno cada 100ms
    }
 
    // Crear con manejo de errores
    @PostMapping
    public Mono<ResponseEntity<ProductoDTO>> crear(@RequestBody @Valid CrearProductoRequest req) {
        return service.crear(req)
            .map(dto -> ResponseEntity.status(HttpStatus.CREATED).body(dto))
            .onErrorReturn(
                DuplicadoException.class,
                ResponseEntity.status(HttpStatus.CONFLICT).build()
            );
    }
}

8. Virtual Threads en Spring Boot 3.2+

Configuración — 3 líneas en application.properties

# application.properties (Spring Boot 3.2+ con Java 21)

# Habilitar Virtual Threads para Tomcat
spring.threads.virtual.enabled=true

# Esto reemplaza el ThreadPool tradicional de Tomcat con Virtual Threads
# Cada petición HTTP corre en su propio Virtual Thread
# → Misma escalabilidad que WebFlux, con código blocking simple
# application.yml
spring:
	threads:
		virtual:
			enabled:true
// Configuración programática (alternativa)
@Configuration
public class VirtualThreadsConfig {
 
    // Configurar Tomcat para usar Virtual Threads
    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutor() {
        return protocolHandler ->
            protocolHandler.setExecutor(
                Executors.newVirtualThreadPerTaskExecutor()
            );
    }
 
    // Configurar @Async para usar Virtual Threads
    @Bean(name = "virtualThreadExecutor")
    public Executor virtualThreadExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }
 
    // Configurar tareas programadas con Virtual Threads
    @Bean
    public TaskScheduler taskSchedulerVirtual() {
        SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
        scheduler.setVirtualThreads(true);  // Spring 6.1+
        return scheduler;
    }
}

Impacto real de Virtual Threads

// ANTES (Spring MVC + Platform Threads):
// - Tomcat tiene un pool de ~200 hilos por defecto
// - Cada petición que espera I/O (BD, HTTP, etc.) bloquea un hilo del pool
// - Con 200 peticiones lentas en curso → 201ª petición espera en cola
 
// DESPUÉS (Spring MVC + Virtual Threads):
// - Cada petición tiene su propio Virtual Thread (~KB de memoria)
// - El Virtual Thread se "desmonta" cuando hace I/O → el carrier thread sirve otro
// - Puede tener millones de peticiones concurrentes
// - El código sigue siendo blocking/imperativo ← misma simplicidad
 
// Mismo código, mayor escalabilidad:
@RestController
public class ProductoController {
 
    @GetMapping("/{id}")
    public ProductoDTO obtener(@PathVariable Long id) {
        // Este código es blocking — pero ahora escala como si fuera reactivo
        Producto p = repo.findById(id)  // I/O bloqueante → VT se desmonta
            .orElseThrow(() -> new EntityNotFoundException("No encontrado"));
 
        String precio = webClient.get().uri("/precio/{id}", id)
            .retrieve().bodyToMono(String.class).block();  // I/O → VT se desmonta
 
        return new ProductoDTO(p, precio);  // sencillo, legible, escalable
    }
}

@Async con Virtual Threads

@Configuration
@EnableAsync
public class AsyncVirtualConfig implements AsyncConfigurer {
 
    @Override
    public Executor getAsyncExecutor() {
        // Con Java 21: usar Virtual Threads para @Async
        return Executors.newVirtualThreadPerTaskExecutor();
    }
}
 
// Ahora @Async es virtualmente ilimitado en concurrencia
@Service
public class ImportacionMasivaService {
 
    @Async
    public CompletableFuture<Void> importar(List<RegistroDTO> registros) {
        // Cada llamada @Async crea un Virtual Thread barato
        // Puedes llamar esto miles de veces sin preocuparte por el pool
        registros.forEach(this::procesarRegistro);
        return CompletableFuture.completedFuture(null);
    }
}

9. Transacciones y Concurrencia

El problema más común: transacciones con operaciones async

// PROBLEMA FRECUENTE: @Transactional + @Async no funcionan como se espera
 
@Service
public class PedidoService {
 
    // El contexto transaccional NO se propaga a hilos @Async
    @Transactional
    public void procesarPedido(Long pedidoId) {
        Pedido pedido = repo.findById(pedidoId).orElseThrow();
        pedido.setEstado("PROCESANDO");
        repo.save(pedido);
 
        // Este método @Async corre en OTRO hilo → SIN la transacción actual
        notificaciones.enviarEmailAsync(pedido);  // ← puede fallar o ver datos sucios
    }
}
 
// ✅ SOLUCIÓN: separar la transacción de la operación async
@Service
public class PedidoService {
 
    @Transactional
    public Pedido procesarPedido(Long pedidoId) {
        Pedido pedido = repo.findById(pedidoId).orElseThrow();
        pedido.setEstado("PROCESANDO");
        return repo.save(pedido);  // transacción se completa (commit) aquí
    }
}
 
@Service
public class PedidoOrquestador {
 
    @Autowired private PedidoService pedidoService;
    @Autowired private NotificacionService notificaciones;
 
    public void procesarYNotificar(Long pedidoId) {
        // 1. Primero la transacción (bloqueante, se hace commit)
        Pedido pedido = pedidoService.procesarPedido(pedidoId);
 
        // 2. Después el async (transacción ya commiteada → datos consistentes)
        notificaciones.enviarEmailAsync(pedido);
    }
}

Niveles de aislamiento y bloqueos optimistas/pesimistas

public interface PedidoRepository extends JpaRepository<Pedido, Long> {
 
    // Bloqueo pesimista — ningún otro hilo puede leer/modificar el registro
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT p FROM Pedido p WHERE p.id = :id")
    Optional<Pedido> findByIdConLockExclusivo(@Param("id") Long id);
 
    // Bloqueo optimista — verifica versión antes de guardar (sin lock en BD)
    @Lock(LockModeType.OPTIMISTIC_FORCE_INCREMENT)
    Optional<Pedido> findById(Long id);
}
 
// Entidad con control de versión para bloqueo optimista
@Entity
public class Inventario {
    @Id
    private Long id;
 
    private int stock;
 
    @Version  // ← Spring Data gestiona esto automáticamente
    private Long version;
}
 
// Servicio con manejo de conflictos optimistas
@Service
@RequiredArgsConstructor
public class InventarioService {
 
    private final InventarioRepository repo;
 
    @Transactional
    @Retryable(  // Spring Retry — reintenta si hay conflicto optimista
        value = OptimisticLockingFailureException.class,
        maxAttempts = 3,
        backoff = @Backoff(delay = 100)
    )
    public void reducirStock(Long productoId, int cantidad) {
        Inventario inv = repo.findById(productoId)
            .orElseThrow(() -> new EntityNotFoundException("Producto no encontrado"));
 
        if (inv.getStock() < cantidad) {
            throw new StockInsuficienteException("Stock insuficiente para: " + productoId);
        }
 
        inv.setStock(inv.getStock() - cantidad);
        repo.save(inv);
        // Si dos transacciones modificaron el mismo registro → OptimisticLockException
        // @Retryable reintenta automáticamente
    }
}

Propagación de transacciones en contextos concurrentes

@Service
@RequiredArgsConstructor
public class ImportacionService {
 
    private final RegistroRepository repo;
    private final ImportacionItemService itemService;
 
    // Procesar miles de registros en paralelo con transacciones independientes
    @Transactional(readOnly = true)  // lectura optimizada
    public ResultadoImportacion importar(List<RegistroDTO> registros) {
        AtomicInteger exitosos = new AtomicInteger(0);
        AtomicInteger fallidos = new AtomicInteger(0);
 
        // Procesar en paralelo — cada item en su PROPIA transacción
        registros.parallelStream()
            .forEach(registro -> {
                try {
                    // REQUIRES_NEW → transacción independiente por registro
                    itemService.procesarConTransaccionPropia(registro);
                    exitosos.incrementAndGet();
                } catch (Exception e) {
                    log.warn("Falló registro {}: {}", registro.id(), e.getMessage());
                    fallidos.incrementAndGet();
                }
            });
 
        return new ResultadoImportacion(exitosos.get(), fallidos.get());
    }
}
 
@Service
public class ImportacionItemService {
 
    // REQUIRES_NEW → siempre una transacción nueva, independiente del llamador
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void procesarConTransaccionPropia(RegistroDTO registro) {
        // Si falla, solo hace rollback de ESTE registro, no de todos
        Entidad entidad = mapper.toEntidad(registro);
        repo.save(entidad);
    }
}

10. Caché, Sesiones y Thread-Safety en Spring

Spring Cache — thread-safe por diseño

@Configuration
@EnableCaching
public class CacheConfig {
 
    // Caffeine — cache local en memoria de alto rendimiento
    @Bean
    public CacheManager cacheManager() {
        CaffeineCacheManager manager = new CaffeineCacheManager();
        manager.setCaffeine(Caffeine.newBuilder()
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .maximumSize(1000)
            .recordStats()
        );
        return manager;
    }
 
    // Redis — cache distribuida para múltiples instancias
    @Bean
    public RedisCacheManager redisCacheManager(RedisConnectionFactory factory) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
            .entryTtl(Duration.ofMinutes(10))
            .serializeValuesWith(
                RedisSerializationContext.SerializationPair.fromSerializer(
                    new GenericJackson2JsonRedisSerializer()
                )
            );
        return RedisCacheManager.builder(factory)
            .cacheDefaults(config)
            .build();
    }
}
 
// Uso en servicios — @Cacheable es thread-safe
@Service
@RequiredArgsConstructor
public class ProductoService {
 
    // Spring gestiona la sincronización de la cache automáticamente
    @Cacheable(value = "productos", key = "#id")
    public ProductoDTO obtener(Long id) {
        return repo.findById(id)
            .map(mapper::toDTO)
            .orElseThrow(() -> new EntityNotFoundException("No encontrado: " + id));
    }
 
    // Invalidar cache cuando se actualiza
    @CacheEvict(value = "productos", key = "#id")
    @Transactional
    public ProductoDTO actualizar(Long id, ActualizarProductoRequest req) {
        Producto p = repo.findById(id).orElseThrow();
        mapper.actualizar(p, req);
        return mapper.toDTO(repo.save(p));
    }
 
    // Actualizar la cache con el nuevo valor
    @CachePut(value = "productos", key = "#result.id")
    @Transactional
    public ProductoDTO crear(CrearProductoRequest req) {
        Producto p = mapper.toEntidad(req);
        return mapper.toDTO(repo.save(p));
    }
 
    // Evict de múltiples entradas
    @Caching(evict = {
        @CacheEvict(value = "productos", key = "#id"),
        @CacheEvict(value = "producto-resumen", key = "#id")
    })
    @Transactional
    public void eliminar(Long id) {
        repo.deleteById(id);
    }
}

Beans de Spring y Thread-Safety

// Los Beans de Spring son SINGLETON por defecto → compartidos entre hilos
// NUNCA guardar estado mutable en campos de un Bean
 
// es peligroso — estado mutable en un singleton
@Service
public class ProcesadorRoto {
    private List<String> resultados = new ArrayList<>();  // ← compartido entre todos los hilos
 
    public List<String> procesar(List<String> datos) {
        resultados.clear();  // Thread A limpia
        // Thread B puede limpiar aquí también
        datos.forEach(d -> resultados.add(transformar(d)));  // condición de carrera
        return resultados;
    }
}
 
// correcto -> sin estado mutable en el bean
@Service
public class ProcesadorCorrecto {
    // Solo campos final e inmutables
    private final TransformacionService transformacion;
 
    public List<String> procesar(List<String> datos) {
        // Variables locales → en el stack del hilo → siempre thread-safe
        return datos.stream()
            .map(transformacion::transformar)
            .collect(Collectors.toList());
    }
}
 
// Si se llega a necesitar estado compartido, usar estructuras concurrentes
@Service
public class ContadorMetricas {
    // Campos concurrentes — thread-safe
    private final ConcurrentHashMap<String, AtomicLong> contadores = new ConcurrentHashMap<>();
    private final LongAdder totalPeticiones = new LongAdder();
 
    public void registrar(String endpoint) {
        contadores.computeIfAbsent(endpoint, k -> new AtomicLong()).incrementAndGet();
        totalPeticiones.increment();
    }
 
    public Map<String, Long> getSnapshot() {
        return contadores.entrySet().stream()
            .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
    }
}

@Scope y thread-safety

// SINGLETON (default) — una instancia compartida entre todos los hilos
// → thread-safe si no tiene estado mutable
 
// PROTOTYPE — nueva instancia en cada inyección
// → thread-safe por diseño (no se comparte)
@Component
@Scope("prototype")
public class ProcesadorConEstado {
    private final List<String> buffer = new ArrayList<>();  // ok — no se comparte
 
    public void agregar(String item) { buffer.add(item); }
    public List<String> vaciar() {
        List<String> resultado = new ArrayList<>(buffer);
        buffer.clear();
        return resultado;
    }
}
 
// REQUEST scope — nueva instancia por petición HTTP (solo en web)
@Component
@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)
public class ContextoPeticion {
    private String usuarioActual;
    private String correlationId = UUID.randomUUID().toString();
    // seguro usar en un Bean singleton que lo inyecte
}
 
// SESSION scope — por sesión HTTP
@Component
@Scope(value = WebApplicationContext.SCOPE_SESSION, proxyMode = ScopedProxyMode.TARGET_CLASS)
public class CarritoCompra {
    private List<ItemCarrito> items = new ArrayList<>();
    // cada usuario tiene su propio carrito
}

11. Spring Batch — Procesamiento en Paralelo

Procesamiento paralelo con Spring Batch

// Spring Batch ya gestiona la concurrencia — solo hay que configurarla
 
@Configuration
@EnableBatchProcessing
public class ImportacionBatchConfig {
 
    @Bean
    public Job importarProductosJob(JobBuilderFactory jobs, Step importarStep) {
        return jobs.get("importarProductos")
            .start(importarStep)
            .build();
    }
 
    @Bean
    public Step importarStep(
        StepBuilderFactory steps,
        ItemReader<ProductoCSV> reader,
        ItemProcessor<ProductoCSV, Producto> processor,
        ItemWriter<Producto> writer
    ) {
        return steps.get("importarProductos")
            .<ProductoCSV, Producto>chunk(100)  // procesar de 100 en 100
            .reader(reader)
            .processor(processor)
            .writer(writer)
            // ← PARALELISMO: múltiples hilos procesando chunks simultáneamente
            .taskExecutor(batchExecutor())
            .throttleLimit(4)  // máximo 4 chunks en paralelo
            .build();
    }
 
    @Bean
    public TaskExecutor batchExecutor() {
        ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
        exec.setCorePoolSize(4);
        exec.setMaxPoolSize(8);
        exec.setThreadNamePrefix("batch-");
        exec.initialize();
        return exec;
    }
 
    // Partición — dividir el trabajo en partes independientes
    @Bean
    public Step importarConParticion(StepBuilderFactory steps, ...) {
        return steps.get("importarParticionado")
            .partitioner("importarStep", partitioner())
            .step(importarStep(...))
            .gridSize(4)  // 4 particiones
            .taskExecutor(batchExecutor())
            .build();
    }
}

12. Eventos y Mensajería Asíncrona

Spring Events — Desacoplar con eventos internos

// Definir eventos como Records (inmutables)
record PedidoCreado(Long pedidoId, String clienteEmail, BigDecimal total) {}
record UsuarioRegistrado(Long usuarioId, String email, String nombre) {}
 
// Publicar eventos en el servicio
@Service
@RequiredArgsConstructor
public class PedidoService {
 
    private final PedidoRepository repo;
    private final ApplicationEventPublisher eventPublisher;
 
    @Transactional
    public Pedido crear(CrearPedidoRequest request) {
        Pedido pedido = mapper.toEntidad(request);
        Pedido guardado = repo.save(pedido);
 
        // Publicar evento — los listeners se encargan de lo demás
        // Desacopla el servicio de pedidos de notificaciones, inventario, etc.
        eventPublisher.publishEvent(
            new PedidoCreado(guardado.getId(), request.email(), guardado.getTotal())
        );
 
        return guardado;
    }
}
 
// Listeners — pueden ser síncronos o asíncronos
@Component
@RequiredArgsConstructor
@Slf4j
public class PedidoEventListener {
 
    private final EmailService email;
    private final InventarioService inventario;
    private final AnaliticasService analiticas;
 
    // Síncrono — dentro de la misma transacción si hay una activa
    @EventListener
    public void onPedidoCreado(PedidoCreado evento) {
        inventario.reservar(evento.pedidoId());
    }
 
    // Asíncrono — fire and forget en otro hilo
    @Async("ioExecutor")
    @EventListener
    public void enviarConfirmacion(PedidoCreado evento) {
        email.enviarConfirmacionPedido(evento.clienteEmail(), evento.pedidoId());
    }
 
    // Se ejecuta DESPUÉS del commit de la transacción (muy útil)
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void registrarEnAnaliticas(PedidoCreado evento) {
        // Solo se llama si la transacción se commitó exitosamente
        analiticas.registrarPedido(evento);
    }
 
    // Se ejecuta si la transacción hace rollback
    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void limpiarRastreo(PedidoCreado evento) {
        log.warn("Pedido {} falló y fue revertido", evento.pedidoId());
    }
}

Spring + RabbitMQ/Kafka para mensajería asíncrona

// PUBLICAR mensajes a RabbitMQ
@Service
@RequiredArgsConstructor
public class PedidoMessagePublisher {
 
    private final RabbitTemplate rabbitTemplate;
 
    @Transactional
    public void publicarPedidoCreado(Pedido pedido) {
        PedidoCreadoMessage msg = new PedidoCreadoMessage(
            pedido.getId(), pedido.getClienteId(), pedido.getTotal()
        );
        // Publicar a exchange, el broker enruta a la cola correspondiente
        rabbitTemplate.convertAndSend("pedidos.exchange", "pedido.creado", msg);
    }
}
 
// CONSUMIR mensajes de RabbitMQ (en su propio hilo del pool de listeners)
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificacionMessageConsumer {
 
    private final NotificacionService notificaciones;
 
    @RabbitListener(
        queues = "pedidos.notificaciones",
        concurrency = "3-10"  // entre 3 y 10 consumers concurrentes
    )
    public void procesarPedidoCreado(PedidoCreadoMessage msg) {
        log.info("Procesando pedido {} en hilo: {}",
            msg.pedidoId(), Thread.currentThread().getName());
        notificaciones.enviarConfirmacion(msg.pedidoId());
    }
}
 
// KAFKA — Producer
@Service
@RequiredArgsConstructor
public class EventoKafkaPublisher {
 
    private final KafkaTemplate<String, Object> kafkaTemplate;
 
    public void publicar(String topic, String key, Object mensaje) {
        kafkaTemplate.send(topic, key, mensaje)
            .thenAccept(result ->
                log.debug("Mensaje enviado a partición {} offset {}",
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset())
            )
            .exceptionally(ex -> {
                log.error("Error enviando mensaje a Kafka: {}", ex.getMessage());
                return null;
            });
    }
}
 
// KAFKA — Consumer
@Component
@Slf4j
public class PedidoKafkaConsumer {
 
    @KafkaListener(
        topics = "pedidos-creados",
        groupId = "notificaciones-service",
        concurrency = "3"  // 3 consumidores paralelos
    )
    public void consumir(
        @Payload PedidoCreadoEvent evento,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset
    ) {
        log.info("Recibido evento {} de partición {} offset {}", evento, partition, offset);
        procesarEvento(evento);
    }
}

13. Problemas Comunes y Cómo Evitarlos

Error 1: LazyInitializationException en contextos async

// PROBLEMA: JPA Lazy loading fuera del contexto de la sesión Hibernate
@Entity
public class Pedido {
    @OneToMany(fetch = FetchType.LAZY)  // ← carga lazy
    private List<Item> items;
}
 
//  FALLA: items se accede fuera de la transacción
@Async
public CompletableFuture<ResumenPedido> calcularResumenAsync(Long pedidoId) {
    Pedido pedido = repo.findById(pedidoId).orElseThrow();  // sin @Transactional
    return CompletableFuture.completedFuture(
        new ResumenPedido(pedido.getItems().size())  // LazyInitializationException!
    );
}
 
//  SOLUCIÓN 1: Cargar los datos antes del async (dentro de la tx)
public CompletableFuture<ResumenPedido> calcularResumenAsync(Long pedidoId) {
    // Cargar dentro de la transacción
    ResumenPedido datos = calcularResumenEnTransaccion(pedidoId);
    // Ya no hay acceso lazy fuera de la tx
    return CompletableFuture.completedFuture(datos);
}
 
@Transactional(readOnly = true)
public ResumenPedido calcularResumenEnTransaccion(Long pedidoId) {
    Pedido pedido = repo.findById(pedidoId).orElseThrow();
    return new ResumenPedido(pedido.getItems().size());  // dentro de la tx ✅
}
 
//  SOLUCIÓN 2: JOIN FETCH en la query
public interface PedidoRepository extends JpaRepository<Pedido, Long> {
    @Query("SELECT p FROM Pedido p JOIN FETCH p.items WHERE p.id = :id")
    Optional<Pedido> findByIdConItems(@Param("id") Long id);
}
 
//  SOLUCIÓN 3: @EntityGraph
@EntityGraph(attributePaths = {"items", "items.producto"})
Optional<Pedido> findById(Long id);

Error 2: @Transactional ignorado en llamadas internas

// PROBLEMA: Spring AOP no intercepta llamadas al mismo objeto
@Service
public class PedidoService {
 
    @Transactional
    public void procesarPedidoPublico(Long id) {
        // Este método llama a otro del mismo bean
        procesarPedidoInterno(id);  //  @Transactional de procesarPedidoInterno es IGNORADO
    }
 
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void procesarPedidoInterno(Long id) {
        // Esta transacción nueva NUNCA se crea — llamada interna sin proxy
    }
}
 
//  SOLUCIÓN 1: Extraer a otro Bean
@Service
public class PedidoInternalService {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void procesarPedidoInterno(Long id) { /* ... */ }
}
 
//  SOLUCIÓN 2: Auto-inyectarse (no recomendado pero funciona)
@Service
public class PedidoService {
 
    @Autowired
    @Lazy
    private PedidoService self;  // proxy de Spring ← llamadas van por el proxy
 
    @Transactional
    public void procesarPedidoPublico(Long id) {
        self.procesarPedidoInterno(id);  //  va por el proxy → @Transactional activo
    }
 
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void procesarPedidoInterno(Long id) { /* ... */ }
}

Error 3: N+1 Queries en Streams

// PROBLEMA: consulta N+1 con streams sobre colecciones JPA
 
//  Genera 1 query para productos + N queries para categorías
List<ProductoDTO> productos = repo.findAll().stream()  // 1 query
    .map(p -> new ProductoDTO(
        p.getNombre(),
        p.getCategoria().getNombre()  // N queries lazy!
    ))
    .toList();
 
// SOLUCIÓN 1: JOIN FETCH
@Query("SELECT p FROM Producto p JOIN FETCH p.categoria WHERE p.activo = true")
List<Producto> findActivosConCategoria();
 
// SOLUCIÓN 2: @EntityGraph
@EntityGraph(attributePaths = "categoria")
List<Producto> findByActivoTrue();
 
// SOLUCIÓN 3: DTO projection con JPQL (más eficiente — trae solo lo necesario)
@Query("SELECT new com.app.dto.ProductoDTO(p.nombre, c.nombre) " +
       "FROM Producto p JOIN p.categoria c WHERE p.activo = true")
List<ProductoDTO> findActivosDTO();

Error 4: Deadlocks en transacciones concurrentes

// PROBLEMA: Dos transacciones se bloquean mutuamente
 
// Tx A: UPDATE inventario WHERE id=1, luego UPDATE inventario WHERE id=2
// Tx B: UPDATE inventario WHERE id=2, luego UPDATE inventario WHERE id=1
// → Deadlock garantizado si ejecutan concurrentemente
 
//  SOLUCIÓN 1: Siempre acceder a registros en el mismo orden
@Transactional
public void transferir(Long cuentaOrigenId, Long cuentaDestinoId, BigDecimal monto) {
    // Ordenar siempre por ID para evitar deadlock
    Long primerLock = Math.min(cuentaOrigenId, cuentaDestinoId);
    Long segundoLock = Math.max(cuentaOrigenId, cuentaDestinoId);
 
    Cuenta primera = repo.findByIdConLock(primerLock).orElseThrow();
    Cuenta segunda = repo.findByIdConLock(segundoLock).orElseThrow();
 
    Cuenta origen  = cuentaOrigenId.equals(primerLock) ? primera : segunda;
    Cuenta destino = cuentaDestinoId.equals(primerLock) ? primera : segunda;
 
    origen.debitar(monto);
    destino.acreditar(monto);
}
 
//  SOLUCIÓN 2: Bloqueo optimista con reintentos
@Retryable(value = OptimisticLockingFailureException.class, maxAttempts = 3)
@Transactional
public void actualizarInventario(Long id, int delta) {
    Inventario inv = repo.findById(id).orElseThrow();
    inv.ajustarStock(delta);
    repo.save(inv);
}

Error 5: Modificar colecciones en streams paralelos

// PROBLEMA: Modificar estado compartido en parallelStream
 
// Race condition — ArrayList no es thread-safe
List<String> resultados = new ArrayList<>();
datos.parallelStream()
    .filter(d -> d.isValido())
    .forEach(d -> resultados.add(d.toString()));  // ← PELIGROSO
 
// SOLUCIÓN: usar collect en vez de forEach
List<String> resultados = datos.parallelStream()
    .filter(Datos::isValido)
    .map(Datos::toString)
    .collect(Collectors.toList());  // ← thread-safe, correcto
 
// O si necesitas acumulación, usar un collector concurrente
ConcurrentHashMap<String, Long> conteo = datos.parallelStream()
    .collect(Collectors.toConcurrentMap(
        Datos::getCategoria,
        d -> 1L,
        Long::sum
    ));

📋 Resumen: Concurrencia en Spring Boot

NecesidadHerramienta de Spring
Método que no bloquea la petición@Async + CompletableFuture
Muchas peticiones I/O concurrentesVirtual Threads (spring.threads.virtual.enabled=true)
Streaming de datos al clienteSpring WebFlux + Flux / SSE
Llamadas paralelas a microserviciosWebClient + CompletableFuture.allOf()
Cache thread-safe@Cacheable (Caffeine o Redis)
Procesar grandes volúmenes de datosSpring Batch con taskExecutor y throttleLimit
Desacoplar acciones post-transacción@TransactionalEventListener(AFTER_COMMIT)
Mensajes asíncronos entre serviciosRabbitMQ / Kafka con @RabbitListener / @KafkaListener
Evitar deadlocks en JPA@Lock(PESSIMISTIC_WRITE) + orden consistente
Reintento ante fallos concurrentes@Retryable con OptimisticLockingFailureException

🔬 Ejercicio Final: API de E-Commerce con concurrencia

// Implementa un endpoint POST /pedidos que:
// 1. Valide stock de cada producto (transacción con bloqueo optimista)
// 2. Llame al servicio de precios en paralelo (WebClient + CompletableFuture)
// 3. Reserve el inventario (transacción propia con @Retryable)
// 4. Publique evento AFTER_COMMIT para notificaciones
// 5. Retorne la respuesta sin esperar las notificaciones
 
@RestController
@RequestMapping("/api/pedidos")
@RequiredArgsConstructor
public class PedidoController {
 
    private final PedidoOrquestador orquestador;
 
    @PostMapping
    public ResponseEntity<PedidoResponse> crear(@RequestBody @Valid CrearPedidoRequest req) {
        return ResponseEntity.status(HttpStatus.CREATED)
            .body(orquestador.crearPedido(req));
    }
}
 
@Service
@RequiredArgsConstructor
@Transactional
public class PedidoOrquestador {
 
    private final InventarioService inventario;
    private final PrecioService precios;
    private final PedidoRepository repo;
    private final ApplicationEventPublisher events;
 
    public PedidoResponse crearPedido(CrearPedidoRequest req) {
        // 1. Precios en paralelo (async, no bloqueante del hilo principal si usamos VT)
        Map<Long, BigDecimal> preciosActuales = precios.obtenerPreciosParalelo(
            req.items().stream().map(ItemRequest::productoId).toList()
        );
 
        // 2. Reservar inventario con bloqueo optimista + retry
        req.items().forEach(item ->
            inventario.reservar(item.productoId(), item.cantidad())
        );
 
        // 3. Calcular total y guardar pedido
        BigDecimal total = req.items().stream()
            .map(item -> preciosActuales.get(item.productoId())
                .multiply(BigDecimal.valueOf(item.cantidad())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);
 
        Pedido pedido = new Pedido(req.clienteId(), req.items(), total);
        Pedido guardado = repo.save(pedido);
 
        // 4. Evento post-commit (notificaciones no bloquean la respuesta)
        events.publishEvent(new PedidoCreado(guardado.getId(), req.email(), total));
 
        return mapper.toResponse(guardado);
        // La respuesta se retorna ANTES de que el email se envíe
    }
}

📚 Recursos recomendados:

  • Spring Boot Reference Documentation — docs.spring.io
  • Spring in Action — Craig Walls
  • Spring Blog: “Virtual Threads in Spring Boot 3.2” — spring.io/blog
  • Baeldung: Spring @Async, Spring Transactions, WebFlux Guide
  • JEP 444: Virtual Threads — openjdk.org