Concurrencia
☕ Java Moderno & Concurrencia en Spring Boot
Streams · Lambdas · Async · Reactive · Thread-Safety en el contexto real de Spring
Objetivo: Dominar Java moderno y concurrencia aplicados directamente al desarrollo con Spring Boot. Cada concepto va acompañado de su uso real en servicios, controladores, repositorios y configuraciones de Spring.
📌 ÍNDICE
- @Async y AsyncConfigurer
- CompletableFuture en Spring
- Spring WebFlux — Programación Reactiva
- Virtual Threads en Spring Boot 3.2+
- Transacciones y Concurrencia
- Caché, Sesiones y Thread-Safety en Spring
- Spring Batch — Procesamiento en Paralelo
- Eventos y Mensajería Asíncrona
- Problemas Comunes y Cómo Evitarlos
5. @Async y AsyncConfigurer
Configurar el ThreadPool para @Async
// Por defecto, @Async usa SimpleAsyncTaskExecutor (un thread por tarea), que es lento
// Siempre es recomendable configurar un propio ThreadPoolTaskExecutor
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
@Bean("taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Número de CPUs disponibles
int cpus = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(cpus); // hilos siempre activos
executor.setMaxPoolSize(cpus * 2); // máximo bajo carga
executor.setQueueCapacity(500); // cola de tareas en espera
executor.setKeepAliveSeconds(60); // tiempo de vida de hilos extras
executor.setThreadNamePrefix("async-"); // nombre para debugging
executor.setWaitForTasksToCompleteOnShutdown(true); // graceful shutdown
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
// Executor separado para tareas de I/O (más hilos porque bloquean)
@Bean("ioExecutor")
public Executor ioExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("io-async-");
executor.initialize();
return executor;
}
// Manejo de excepciones no capturadas en métodos @Async
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
log.error("Error en método async: {}.{}() — {}",
method.getDeclaringClass().getSimpleName(),
method.getName(),
ex.getMessage(), ex);
// Aquí podrías enviar alerta, guardar en BD, etc.
};
}
}Usar @Async en servicios
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificacionService {
private final EmailSender emailSender;
private final PushSender pushSender;
// ① @Async sin retorno — "fire and forget"
@Async("ioExecutor")
public void enviarEmailBienvenida(String email, String nombre) {
log.info("Enviando email a {} en hilo: {}", email, Thread.currentThread().getName());
emailSender.enviar(email, "Bienvenido " + nombre, prepararTemplate(nombre));
}
// ② @Async con CompletableFuture — permite obtener el resultado
@Async("ioExecutor")
public CompletableFuture<Boolean> enviarConConfirmacion(String email) {
try {
emailSender.enviar(email, "Confirmación", "...");
return CompletableFuture.completedFuture(true);
} catch (Exception ex) {
log.error("Error enviando email a {}", email, ex);
return CompletableFuture.completedFuture(false);
}
}
// ③ @Async que retorna Future (antiguo, menos recomendado)
@Async
public Future<String> procesarConFuture(String datos) {
String resultado = procesarDatos(datos);
return new AsyncResult<>(resultado);
}
}
// REGLA IMPORTANTE: @Async solo funciona cuando el método es llamado
// DESDE OTRO BEAN (no desde el mismo bean — ya que Spring usa proxies)
@Service
public class PedidoService {
@Autowired
private NotificacionService notificaciones; // bean externo ← ✅
@Transactional
public Pedido crear(CrearPedidoRequest req) {
Pedido pedido = guardarPedido(req);
// ✅ Llamar al método @Async desde otro bean
notificaciones.enviarEmailBienvenida(req.email(), req.nombre());
return pedido;
}
// llamadas internas al mismo bean no funcionan,
// notificaciones.enviarEmailBienvenida() sería ignorado si estuviera en este mismo bean
}Ejecutar múltiples tareas @Async en paralelo
@Service
@RequiredArgsConstructor
public class DashboardService {
private final MetricasService metricas;
private final PedidosService pedidos;
private final UsuariosService usuarios;
private final InventarioService inventario;
// Lanzar 4 consultas en paralelo con @Async + CompletableFuture
public DashboardDTO cargarDashboard() {
// Cada método es @Async → se ejecutan simultáneamente
CompletableFuture<MetricasDTO> metricasF = metricas.obtenerAsync();
CompletableFuture<PedidosDTO> pedidosF = pedidos.obtenerResumenAsync();
CompletableFuture<UsuariosDTO> usuariosF = usuarios.obtenerEstadisticasAsync();
CompletableFuture<InventarioDTO> inventarioF = inventario.obtenerAlertasAsync();
// Esperar a todos y combinar
CompletableFuture.allOf(metricasF, pedidosF, usuariosF, inventarioF).join();
return new DashboardDTO(
metricasF.join(),
pedidosF.join(),
usuariosF.join(),
inventarioF.join()
);
}
}
// En los servicios individuales:
@Service
public class MetricasService {
@Async("taskExecutor")
public CompletableFuture<MetricasDTO> obtenerAsync() {
MetricasDTO datos = calcularMetricas(); // operación lenta
return CompletableFuture.completedFuture(datos);
}
}6. CompletableFuture en Spring
Patrón: Orquestación de llamadas a microservicios
// Caso de estudio: construir una respuesta combinando datos de varios microservicios
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductoDetailService {
private final RestTemplate restTemplate;
private final WebClient webClient; // sugerido para async
@Async("ioExecutor")
public CompletableFuture<PrecioDTO> obtenerPrecio(Long productoId) {
PrecioDTO precio = webClient.get()
.uri("/precios/{id}", productoId)
.retrieve()
.bodyToMono(PrecioDTO.class)
.block(); // En @Async es aceptable bloquear (el hilo es del pool async)
return CompletableFuture.completedFuture(precio);
}
@Async("ioExecutor")
public CompletableFuture<InventarioDTO> obtenerInventario(Long productoId) {
InventarioDTO inv = webClient.get()
.uri("/inventario/{id}", productoId)
.retrieve()
.bodyToMono(InventarioDTO.class)
.block();
return CompletableFuture.completedFuture(inv);
}
@Async("ioExecutor")
public CompletableFuture<List<ResenaDTO>> obtenerResenas(Long productoId) {
return CompletableFuture.supplyAsync(() ->
webClient.get()
.uri("/resenas/producto/{id}", productoId)
.retrieve()
.bodyToFlux(ResenaDTO.class)
.collectList()
.block()
);
}
// Orquestar las 3 llamadas en paralelo
public ProductoDetalleDTO obtenerDetalle(Long productoId) {
Producto producto = productoRepo.findById(productoId)
.orElseThrow(() -> new EntityNotFoundException("Producto no encontrado"));
// Lanzar en paralelo
CompletableFuture<PrecioDTO> precioF = obtenerPrecio(productoId);
CompletableFuture<InventarioDTO> invF = obtenerInventario(productoId);
CompletableFuture<List<ResenaDTO>> resenasF = obtenerResenas(productoId);
// Timeout + fallback para resiliencia
CompletableFuture<PrecioDTO> precioSeguro = precioF
.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.warn("Servicio de precios no disponible: {}", ex.getMessage());
return new PrecioDTO(producto.getPrecioBase(), "FALLBACK");
});
CompletableFuture<List<ResenaDTO>> resenasSeguras = resenasF
.completeOnTimeout(List.of(), 1, TimeUnit.SECONDS); // lista vacía si timeout
CompletableFuture.allOf(precioSeguro, invF, resenasSeguras).join();
return new ProductoDetalleDTO(
producto,
precioSeguro.join(),
invF.join(),
resenasSeguras.join()
);
}
}Manejo de errores en CompletableFuture con Spring
@Service
public class ProcesadorPagosService {
// Pipeline asíncrono con manejo de errores en cada etapa
public CompletableFuture<ResultadoPago> procesarPago(SolicitudPago solicitud) {
return CompletableFuture
.supplyAsync(() -> validarSolicitud(solicitud), ioExecutor)
.thenApplyAsync(validada -> reservarFondos(validada), ioExecutor)
.thenApplyAsync(reserva -> ejecutarTransferencia(reserva), ioExecutor)
.thenApplyAsync(tx -> notificar(tx), ioExecutor)
.handle((resultado, error) -> {
if (error != null) {
log.error("Error procesando pago: {}", error.getMessage());
auditoria.registrarFallo(solicitud, error);
return ResultadoPago.fallido(error.getMessage());
}
auditoria.registrarExito(solicitud, resultado);
return resultado;
});
}
}7. Spring WebFlux — Programación Reactiva
¿Cuándo usar WebFlux vs Spring MVC?
Spring MVC (Servlet, Blocking):
✅ Aplicaciones CRUD tradicionales
✅ Equipos con menos experiencia en reactive
✅ JPA/JDBC (no hay drivers reactivos maduros para todo)
✅ La mayoría de aplicaciones empresariales
❌ No escala bien con muchas conexiones concurrentes largas (SSE, WebSocket)
Spring WebFlux (Reactive, Non-Blocking):
✅ Muchas conexiones concurrentes (streaming, tiempo real)
✅ Integración entre microservicios
✅ Con R2DBC (reactive DB driver)
✅ Cuando ya usas Mono/Flux en toda la cadena
❌ Curva de aprendizaje alta
❌ Debugging más difícil
❌ No mezclar con código bloqueante (mata el beneficio)
WebClient — La forma moderna de hacer HTTP en Spring
// WebClient reemplaza a RestTemplate (que está deprecado)
// Funciona en MVC y WebFlux
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.codecs(c -> c.defaultCodecs().maxInMemorySize(1024 * 1024)) // 1MB
.filter(logRequest())
.filter(logResponse())
.build();
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(req -> {
log.debug("{} {}", req.method(), req.url());
return Mono.just(req);
});
}
}
// Uso en servicio — modo BLOCKING (dentro de Spring MVC)
@Service
@RequiredArgsConstructor
public class ClienteExternoService {
private final WebClient webClient;
// Llamada bloqueante (aceptable en MVC con @Async)
@Async("ioExecutor")
public CompletableFuture<UsuarioExternoDTO> obtenerUsuarioAsync(String userId) {
UsuarioExternoDTO usuario = webClient.get()
.uri("/usuarios/{id}", userId)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, response ->
Mono.error(new EntityNotFoundException("Usuario no encontrado: " + userId))
)
.onStatus(HttpStatusCode::is5xxServerError, response ->
Mono.error(new ServiceUnavailableException("Servicio externo no disponible"))
)
.bodyToMono(UsuarioExternoDTO.class)
.timeout(Duration.ofSeconds(5))
.block(); // bloquea porque estamos en MVC + @Async
return CompletableFuture.completedFuture(usuario);
}
// Llamada con retry
public UsuarioExternoDTO obtenerConRetry(String userId) {
return webClient.get()
.uri("/usuarios/{id}", userId)
.retrieve()
.bodyToMono(UsuarioExternoDTO.class)
.retryWhen(Retry.backoff(3, Duration.ofMillis(500))
.filter(ex -> ex instanceof ServiceUnavailableException))
.block();
}
// Obtener lista con transformación
public List<ProductoExternoDTO> listarProductos(String categoria) {
return webClient.get()
.uri(uri -> uri.path("/productos")
.queryParam("categoria", categoria)
.build())
.retrieve()
.bodyToFlux(ProductoExternoDTO.class)
.collectList()
.block();
}
}Controlador Reactivo básico
// Spring WebFlux Controller — devuelve Mono/Flux en vez de objetos directos
@RestController
@RequestMapping("/api/reactive/productos")
@RequiredArgsConstructor
public class ProductoReactiveController {
private final ProductoReactiveService service;
// Mono — cero o un elemento
@GetMapping("/{id}")
public Mono<ResponseEntity<ProductoDTO>> obtener(@PathVariable Long id) {
return service.buscar(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
// Flux — cero o muchos elementos
@GetMapping
public Flux<ProductoDTO> listar() {
return service.listarTodos();
}
// Server-Sent Events — streaming de datos al cliente
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductoDTO> stream() {
return service.listarTodos()
.delayElements(Duration.ofMillis(100)); // uno por uno cada 100ms
}
// Crear con manejo de errores
@PostMapping
public Mono<ResponseEntity<ProductoDTO>> crear(@RequestBody @Valid CrearProductoRequest req) {
return service.crear(req)
.map(dto -> ResponseEntity.status(HttpStatus.CREATED).body(dto))
.onErrorReturn(
DuplicadoException.class,
ResponseEntity.status(HttpStatus.CONFLICT).build()
);
}
}8. Virtual Threads en Spring Boot 3.2+
Configuración — 3 líneas en application.properties
# application.properties (Spring Boot 3.2+ con Java 21)
# Habilitar Virtual Threads para Tomcat
spring.threads.virtual.enabled=true
# Esto reemplaza el ThreadPool tradicional de Tomcat con Virtual Threads
# Cada petición HTTP corre en su propio Virtual Thread
# → Misma escalabilidad que WebFlux, con código blocking simple
# application.yml
spring:
threads:
virtual:
enabled:true// Configuración programática (alternativa)
@Configuration
public class VirtualThreadsConfig {
// Configurar Tomcat para usar Virtual Threads
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutor() {
return protocolHandler ->
protocolHandler.setExecutor(
Executors.newVirtualThreadPerTaskExecutor()
);
}
// Configurar @Async para usar Virtual Threads
@Bean(name = "virtualThreadExecutor")
public Executor virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
// Configurar tareas programadas con Virtual Threads
@Bean
public TaskScheduler taskSchedulerVirtual() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setVirtualThreads(true); // Spring 6.1+
return scheduler;
}
}Impacto real de Virtual Threads
// ANTES (Spring MVC + Platform Threads):
// - Tomcat tiene un pool de ~200 hilos por defecto
// - Cada petición que espera I/O (BD, HTTP, etc.) bloquea un hilo del pool
// - Con 200 peticiones lentas en curso → 201ª petición espera en cola
// DESPUÉS (Spring MVC + Virtual Threads):
// - Cada petición tiene su propio Virtual Thread (~KB de memoria)
// - El Virtual Thread se "desmonta" cuando hace I/O → el carrier thread sirve otro
// - Puede tener millones de peticiones concurrentes
// - El código sigue siendo blocking/imperativo ← misma simplicidad
// Mismo código, mayor escalabilidad:
@RestController
public class ProductoController {
@GetMapping("/{id}")
public ProductoDTO obtener(@PathVariable Long id) {
// Este código es blocking — pero ahora escala como si fuera reactivo
Producto p = repo.findById(id) // I/O bloqueante → VT se desmonta
.orElseThrow(() -> new EntityNotFoundException("No encontrado"));
String precio = webClient.get().uri("/precio/{id}", id)
.retrieve().bodyToMono(String.class).block(); // I/O → VT se desmonta
return new ProductoDTO(p, precio); // sencillo, legible, escalable
}
}@Async con Virtual Threads
@Configuration
@EnableAsync
public class AsyncVirtualConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
// Con Java 21: usar Virtual Threads para @Async
return Executors.newVirtualThreadPerTaskExecutor();
}
}
// Ahora @Async es virtualmente ilimitado en concurrencia
@Service
public class ImportacionMasivaService {
@Async
public CompletableFuture<Void> importar(List<RegistroDTO> registros) {
// Cada llamada @Async crea un Virtual Thread barato
// Puedes llamar esto miles de veces sin preocuparte por el pool
registros.forEach(this::procesarRegistro);
return CompletableFuture.completedFuture(null);
}
}9. Transacciones y Concurrencia
El problema más común: transacciones con operaciones async
// PROBLEMA FRECUENTE: @Transactional + @Async no funcionan como se espera
@Service
public class PedidoService {
// El contexto transaccional NO se propaga a hilos @Async
@Transactional
public void procesarPedido(Long pedidoId) {
Pedido pedido = repo.findById(pedidoId).orElseThrow();
pedido.setEstado("PROCESANDO");
repo.save(pedido);
// Este método @Async corre en OTRO hilo → SIN la transacción actual
notificaciones.enviarEmailAsync(pedido); // ← puede fallar o ver datos sucios
}
}
// ✅ SOLUCIÓN: separar la transacción de la operación async
@Service
public class PedidoService {
@Transactional
public Pedido procesarPedido(Long pedidoId) {
Pedido pedido = repo.findById(pedidoId).orElseThrow();
pedido.setEstado("PROCESANDO");
return repo.save(pedido); // transacción se completa (commit) aquí
}
}
@Service
public class PedidoOrquestador {
@Autowired private PedidoService pedidoService;
@Autowired private NotificacionService notificaciones;
public void procesarYNotificar(Long pedidoId) {
// 1. Primero la transacción (bloqueante, se hace commit)
Pedido pedido = pedidoService.procesarPedido(pedidoId);
// 2. Después el async (transacción ya commiteada → datos consistentes)
notificaciones.enviarEmailAsync(pedido);
}
}Niveles de aislamiento y bloqueos optimistas/pesimistas
public interface PedidoRepository extends JpaRepository<Pedido, Long> {
// Bloqueo pesimista — ningún otro hilo puede leer/modificar el registro
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT p FROM Pedido p WHERE p.id = :id")
Optional<Pedido> findByIdConLockExclusivo(@Param("id") Long id);
// Bloqueo optimista — verifica versión antes de guardar (sin lock en BD)
@Lock(LockModeType.OPTIMISTIC_FORCE_INCREMENT)
Optional<Pedido> findById(Long id);
}
// Entidad con control de versión para bloqueo optimista
@Entity
public class Inventario {
@Id
private Long id;
private int stock;
@Version // ← Spring Data gestiona esto automáticamente
private Long version;
}
// Servicio con manejo de conflictos optimistas
@Service
@RequiredArgsConstructor
public class InventarioService {
private final InventarioRepository repo;
@Transactional
@Retryable( // Spring Retry — reintenta si hay conflicto optimista
value = OptimisticLockingFailureException.class,
maxAttempts = 3,
backoff = @Backoff(delay = 100)
)
public void reducirStock(Long productoId, int cantidad) {
Inventario inv = repo.findById(productoId)
.orElseThrow(() -> new EntityNotFoundException("Producto no encontrado"));
if (inv.getStock() < cantidad) {
throw new StockInsuficienteException("Stock insuficiente para: " + productoId);
}
inv.setStock(inv.getStock() - cantidad);
repo.save(inv);
// Si dos transacciones modificaron el mismo registro → OptimisticLockException
// @Retryable reintenta automáticamente
}
}Propagación de transacciones en contextos concurrentes
@Service
@RequiredArgsConstructor
public class ImportacionService {
private final RegistroRepository repo;
private final ImportacionItemService itemService;
// Procesar miles de registros en paralelo con transacciones independientes
@Transactional(readOnly = true) // lectura optimizada
public ResultadoImportacion importar(List<RegistroDTO> registros) {
AtomicInteger exitosos = new AtomicInteger(0);
AtomicInteger fallidos = new AtomicInteger(0);
// Procesar en paralelo — cada item en su PROPIA transacción
registros.parallelStream()
.forEach(registro -> {
try {
// REQUIRES_NEW → transacción independiente por registro
itemService.procesarConTransaccionPropia(registro);
exitosos.incrementAndGet();
} catch (Exception e) {
log.warn("Falló registro {}: {}", registro.id(), e.getMessage());
fallidos.incrementAndGet();
}
});
return new ResultadoImportacion(exitosos.get(), fallidos.get());
}
}
@Service
public class ImportacionItemService {
// REQUIRES_NEW → siempre una transacción nueva, independiente del llamador
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void procesarConTransaccionPropia(RegistroDTO registro) {
// Si falla, solo hace rollback de ESTE registro, no de todos
Entidad entidad = mapper.toEntidad(registro);
repo.save(entidad);
}
}10. Caché, Sesiones y Thread-Safety en Spring
Spring Cache — thread-safe por diseño
@Configuration
@EnableCaching
public class CacheConfig {
// Caffeine — cache local en memoria de alto rendimiento
@Bean
public CacheManager cacheManager() {
CaffeineCacheManager manager = new CaffeineCacheManager();
manager.setCaffeine(Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.maximumSize(1000)
.recordStats()
);
return manager;
}
// Redis — cache distribuida para múltiples instancias
@Bean
public RedisCacheManager redisCacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
.serializeValuesWith(
RedisSerializationContext.SerializationPair.fromSerializer(
new GenericJackson2JsonRedisSerializer()
)
);
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
}
}
// Uso en servicios — @Cacheable es thread-safe
@Service
@RequiredArgsConstructor
public class ProductoService {
// Spring gestiona la sincronización de la cache automáticamente
@Cacheable(value = "productos", key = "#id")
public ProductoDTO obtener(Long id) {
return repo.findById(id)
.map(mapper::toDTO)
.orElseThrow(() -> new EntityNotFoundException("No encontrado: " + id));
}
// Invalidar cache cuando se actualiza
@CacheEvict(value = "productos", key = "#id")
@Transactional
public ProductoDTO actualizar(Long id, ActualizarProductoRequest req) {
Producto p = repo.findById(id).orElseThrow();
mapper.actualizar(p, req);
return mapper.toDTO(repo.save(p));
}
// Actualizar la cache con el nuevo valor
@CachePut(value = "productos", key = "#result.id")
@Transactional
public ProductoDTO crear(CrearProductoRequest req) {
Producto p = mapper.toEntidad(req);
return mapper.toDTO(repo.save(p));
}
// Evict de múltiples entradas
@Caching(evict = {
@CacheEvict(value = "productos", key = "#id"),
@CacheEvict(value = "producto-resumen", key = "#id")
})
@Transactional
public void eliminar(Long id) {
repo.deleteById(id);
}
}Beans de Spring y Thread-Safety
// Los Beans de Spring son SINGLETON por defecto → compartidos entre hilos
// NUNCA guardar estado mutable en campos de un Bean
// es peligroso — estado mutable en un singleton
@Service
public class ProcesadorRoto {
private List<String> resultados = new ArrayList<>(); // ← compartido entre todos los hilos
public List<String> procesar(List<String> datos) {
resultados.clear(); // Thread A limpia
// Thread B puede limpiar aquí también
datos.forEach(d -> resultados.add(transformar(d))); // condición de carrera
return resultados;
}
}
// correcto -> sin estado mutable en el bean
@Service
public class ProcesadorCorrecto {
// Solo campos final e inmutables
private final TransformacionService transformacion;
public List<String> procesar(List<String> datos) {
// Variables locales → en el stack del hilo → siempre thread-safe
return datos.stream()
.map(transformacion::transformar)
.collect(Collectors.toList());
}
}
// Si se llega a necesitar estado compartido, usar estructuras concurrentes
@Service
public class ContadorMetricas {
// Campos concurrentes — thread-safe
private final ConcurrentHashMap<String, AtomicLong> contadores = new ConcurrentHashMap<>();
private final LongAdder totalPeticiones = new LongAdder();
public void registrar(String endpoint) {
contadores.computeIfAbsent(endpoint, k -> new AtomicLong()).incrementAndGet();
totalPeticiones.increment();
}
public Map<String, Long> getSnapshot() {
return contadores.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
}
}@Scope y thread-safety
// SINGLETON (default) — una instancia compartida entre todos los hilos
// → thread-safe si no tiene estado mutable
// PROTOTYPE — nueva instancia en cada inyección
// → thread-safe por diseño (no se comparte)
@Component
@Scope("prototype")
public class ProcesadorConEstado {
private final List<String> buffer = new ArrayList<>(); // ok — no se comparte
public void agregar(String item) { buffer.add(item); }
public List<String> vaciar() {
List<String> resultado = new ArrayList<>(buffer);
buffer.clear();
return resultado;
}
}
// REQUEST scope — nueva instancia por petición HTTP (solo en web)
@Component
@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)
public class ContextoPeticion {
private String usuarioActual;
private String correlationId = UUID.randomUUID().toString();
// seguro usar en un Bean singleton que lo inyecte
}
// SESSION scope — por sesión HTTP
@Component
@Scope(value = WebApplicationContext.SCOPE_SESSION, proxyMode = ScopedProxyMode.TARGET_CLASS)
public class CarritoCompra {
private List<ItemCarrito> items = new ArrayList<>();
// cada usuario tiene su propio carrito
}11. Spring Batch — Procesamiento en Paralelo
Procesamiento paralelo con Spring Batch
// Spring Batch ya gestiona la concurrencia — solo hay que configurarla
@Configuration
@EnableBatchProcessing
public class ImportacionBatchConfig {
@Bean
public Job importarProductosJob(JobBuilderFactory jobs, Step importarStep) {
return jobs.get("importarProductos")
.start(importarStep)
.build();
}
@Bean
public Step importarStep(
StepBuilderFactory steps,
ItemReader<ProductoCSV> reader,
ItemProcessor<ProductoCSV, Producto> processor,
ItemWriter<Producto> writer
) {
return steps.get("importarProductos")
.<ProductoCSV, Producto>chunk(100) // procesar de 100 en 100
.reader(reader)
.processor(processor)
.writer(writer)
// ← PARALELISMO: múltiples hilos procesando chunks simultáneamente
.taskExecutor(batchExecutor())
.throttleLimit(4) // máximo 4 chunks en paralelo
.build();
}
@Bean
public TaskExecutor batchExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(4);
exec.setMaxPoolSize(8);
exec.setThreadNamePrefix("batch-");
exec.initialize();
return exec;
}
// Partición — dividir el trabajo en partes independientes
@Bean
public Step importarConParticion(StepBuilderFactory steps, ...) {
return steps.get("importarParticionado")
.partitioner("importarStep", partitioner())
.step(importarStep(...))
.gridSize(4) // 4 particiones
.taskExecutor(batchExecutor())
.build();
}
}12. Eventos y Mensajería Asíncrona
Spring Events — Desacoplar con eventos internos
// Definir eventos como Records (inmutables)
record PedidoCreado(Long pedidoId, String clienteEmail, BigDecimal total) {}
record UsuarioRegistrado(Long usuarioId, String email, String nombre) {}
// Publicar eventos en el servicio
@Service
@RequiredArgsConstructor
public class PedidoService {
private final PedidoRepository repo;
private final ApplicationEventPublisher eventPublisher;
@Transactional
public Pedido crear(CrearPedidoRequest request) {
Pedido pedido = mapper.toEntidad(request);
Pedido guardado = repo.save(pedido);
// Publicar evento — los listeners se encargan de lo demás
// Desacopla el servicio de pedidos de notificaciones, inventario, etc.
eventPublisher.publishEvent(
new PedidoCreado(guardado.getId(), request.email(), guardado.getTotal())
);
return guardado;
}
}
// Listeners — pueden ser síncronos o asíncronos
@Component
@RequiredArgsConstructor
@Slf4j
public class PedidoEventListener {
private final EmailService email;
private final InventarioService inventario;
private final AnaliticasService analiticas;
// Síncrono — dentro de la misma transacción si hay una activa
@EventListener
public void onPedidoCreado(PedidoCreado evento) {
inventario.reservar(evento.pedidoId());
}
// Asíncrono — fire and forget en otro hilo
@Async("ioExecutor")
@EventListener
public void enviarConfirmacion(PedidoCreado evento) {
email.enviarConfirmacionPedido(evento.clienteEmail(), evento.pedidoId());
}
// Se ejecuta DESPUÉS del commit de la transacción (muy útil)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void registrarEnAnaliticas(PedidoCreado evento) {
// Solo se llama si la transacción se commitó exitosamente
analiticas.registrarPedido(evento);
}
// Se ejecuta si la transacción hace rollback
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void limpiarRastreo(PedidoCreado evento) {
log.warn("Pedido {} falló y fue revertido", evento.pedidoId());
}
}Spring + RabbitMQ/Kafka para mensajería asíncrona
// PUBLICAR mensajes a RabbitMQ
@Service
@RequiredArgsConstructor
public class PedidoMessagePublisher {
private final RabbitTemplate rabbitTemplate;
@Transactional
public void publicarPedidoCreado(Pedido pedido) {
PedidoCreadoMessage msg = new PedidoCreadoMessage(
pedido.getId(), pedido.getClienteId(), pedido.getTotal()
);
// Publicar a exchange, el broker enruta a la cola correspondiente
rabbitTemplate.convertAndSend("pedidos.exchange", "pedido.creado", msg);
}
}
// CONSUMIR mensajes de RabbitMQ (en su propio hilo del pool de listeners)
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificacionMessageConsumer {
private final NotificacionService notificaciones;
@RabbitListener(
queues = "pedidos.notificaciones",
concurrency = "3-10" // entre 3 y 10 consumers concurrentes
)
public void procesarPedidoCreado(PedidoCreadoMessage msg) {
log.info("Procesando pedido {} en hilo: {}",
msg.pedidoId(), Thread.currentThread().getName());
notificaciones.enviarConfirmacion(msg.pedidoId());
}
}
// KAFKA — Producer
@Service
@RequiredArgsConstructor
public class EventoKafkaPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publicar(String topic, String key, Object mensaje) {
kafkaTemplate.send(topic, key, mensaje)
.thenAccept(result ->
log.debug("Mensaje enviado a partición {} offset {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset())
)
.exceptionally(ex -> {
log.error("Error enviando mensaje a Kafka: {}", ex.getMessage());
return null;
});
}
}
// KAFKA — Consumer
@Component
@Slf4j
public class PedidoKafkaConsumer {
@KafkaListener(
topics = "pedidos-creados",
groupId = "notificaciones-service",
concurrency = "3" // 3 consumidores paralelos
)
public void consumir(
@Payload PedidoCreadoEvent evento,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
log.info("Recibido evento {} de partición {} offset {}", evento, partition, offset);
procesarEvento(evento);
}
}13. Problemas Comunes y Cómo Evitarlos
Error 1: LazyInitializationException en contextos async
// PROBLEMA: JPA Lazy loading fuera del contexto de la sesión Hibernate
@Entity
public class Pedido {
@OneToMany(fetch = FetchType.LAZY) // ← carga lazy
private List<Item> items;
}
// FALLA: items se accede fuera de la transacción
@Async
public CompletableFuture<ResumenPedido> calcularResumenAsync(Long pedidoId) {
Pedido pedido = repo.findById(pedidoId).orElseThrow(); // sin @Transactional
return CompletableFuture.completedFuture(
new ResumenPedido(pedido.getItems().size()) // LazyInitializationException!
);
}
// SOLUCIÓN 1: Cargar los datos antes del async (dentro de la tx)
public CompletableFuture<ResumenPedido> calcularResumenAsync(Long pedidoId) {
// Cargar dentro de la transacción
ResumenPedido datos = calcularResumenEnTransaccion(pedidoId);
// Ya no hay acceso lazy fuera de la tx
return CompletableFuture.completedFuture(datos);
}
@Transactional(readOnly = true)
public ResumenPedido calcularResumenEnTransaccion(Long pedidoId) {
Pedido pedido = repo.findById(pedidoId).orElseThrow();
return new ResumenPedido(pedido.getItems().size()); // dentro de la tx ✅
}
// SOLUCIÓN 2: JOIN FETCH en la query
public interface PedidoRepository extends JpaRepository<Pedido, Long> {
@Query("SELECT p FROM Pedido p JOIN FETCH p.items WHERE p.id = :id")
Optional<Pedido> findByIdConItems(@Param("id") Long id);
}
// SOLUCIÓN 3: @EntityGraph
@EntityGraph(attributePaths = {"items", "items.producto"})
Optional<Pedido> findById(Long id);Error 2: @Transactional ignorado en llamadas internas
// PROBLEMA: Spring AOP no intercepta llamadas al mismo objeto
@Service
public class PedidoService {
@Transactional
public void procesarPedidoPublico(Long id) {
// Este método llama a otro del mismo bean
procesarPedidoInterno(id); // @Transactional de procesarPedidoInterno es IGNORADO
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void procesarPedidoInterno(Long id) {
// Esta transacción nueva NUNCA se crea — llamada interna sin proxy
}
}
// SOLUCIÓN 1: Extraer a otro Bean
@Service
public class PedidoInternalService {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void procesarPedidoInterno(Long id) { /* ... */ }
}
// SOLUCIÓN 2: Auto-inyectarse (no recomendado pero funciona)
@Service
public class PedidoService {
@Autowired
@Lazy
private PedidoService self; // proxy de Spring ← llamadas van por el proxy
@Transactional
public void procesarPedidoPublico(Long id) {
self.procesarPedidoInterno(id); // va por el proxy → @Transactional activo
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void procesarPedidoInterno(Long id) { /* ... */ }
}Error 3: N+1 Queries en Streams
// PROBLEMA: consulta N+1 con streams sobre colecciones JPA
// Genera 1 query para productos + N queries para categorías
List<ProductoDTO> productos = repo.findAll().stream() // 1 query
.map(p -> new ProductoDTO(
p.getNombre(),
p.getCategoria().getNombre() // N queries lazy!
))
.toList();
// SOLUCIÓN 1: JOIN FETCH
@Query("SELECT p FROM Producto p JOIN FETCH p.categoria WHERE p.activo = true")
List<Producto> findActivosConCategoria();
// SOLUCIÓN 2: @EntityGraph
@EntityGraph(attributePaths = "categoria")
List<Producto> findByActivoTrue();
// SOLUCIÓN 3: DTO projection con JPQL (más eficiente — trae solo lo necesario)
@Query("SELECT new com.app.dto.ProductoDTO(p.nombre, c.nombre) " +
"FROM Producto p JOIN p.categoria c WHERE p.activo = true")
List<ProductoDTO> findActivosDTO();Error 4: Deadlocks en transacciones concurrentes
// PROBLEMA: Dos transacciones se bloquean mutuamente
// Tx A: UPDATE inventario WHERE id=1, luego UPDATE inventario WHERE id=2
// Tx B: UPDATE inventario WHERE id=2, luego UPDATE inventario WHERE id=1
// → Deadlock garantizado si ejecutan concurrentemente
// SOLUCIÓN 1: Siempre acceder a registros en el mismo orden
@Transactional
public void transferir(Long cuentaOrigenId, Long cuentaDestinoId, BigDecimal monto) {
// Ordenar siempre por ID para evitar deadlock
Long primerLock = Math.min(cuentaOrigenId, cuentaDestinoId);
Long segundoLock = Math.max(cuentaOrigenId, cuentaDestinoId);
Cuenta primera = repo.findByIdConLock(primerLock).orElseThrow();
Cuenta segunda = repo.findByIdConLock(segundoLock).orElseThrow();
Cuenta origen = cuentaOrigenId.equals(primerLock) ? primera : segunda;
Cuenta destino = cuentaDestinoId.equals(primerLock) ? primera : segunda;
origen.debitar(monto);
destino.acreditar(monto);
}
// SOLUCIÓN 2: Bloqueo optimista con reintentos
@Retryable(value = OptimisticLockingFailureException.class, maxAttempts = 3)
@Transactional
public void actualizarInventario(Long id, int delta) {
Inventario inv = repo.findById(id).orElseThrow();
inv.ajustarStock(delta);
repo.save(inv);
}Error 5: Modificar colecciones en streams paralelos
// PROBLEMA: Modificar estado compartido en parallelStream
// Race condition — ArrayList no es thread-safe
List<String> resultados = new ArrayList<>();
datos.parallelStream()
.filter(d -> d.isValido())
.forEach(d -> resultados.add(d.toString())); // ← PELIGROSO
// SOLUCIÓN: usar collect en vez de forEach
List<String> resultados = datos.parallelStream()
.filter(Datos::isValido)
.map(Datos::toString)
.collect(Collectors.toList()); // ← thread-safe, correcto
// O si necesitas acumulación, usar un collector concurrente
ConcurrentHashMap<String, Long> conteo = datos.parallelStream()
.collect(Collectors.toConcurrentMap(
Datos::getCategoria,
d -> 1L,
Long::sum
));📋 Resumen: Concurrencia en Spring Boot
| Necesidad | Herramienta de Spring |
|---|---|
| Método que no bloquea la petición | @Async + CompletableFuture |
| Muchas peticiones I/O concurrentes | Virtual Threads (spring.threads.virtual.enabled=true) |
| Streaming de datos al cliente | Spring WebFlux + Flux / SSE |
| Llamadas paralelas a microservicios | WebClient + CompletableFuture.allOf() |
| Cache thread-safe | @Cacheable (Caffeine o Redis) |
| Procesar grandes volúmenes de datos | Spring Batch con taskExecutor y throttleLimit |
| Desacoplar acciones post-transacción | @TransactionalEventListener(AFTER_COMMIT) |
| Mensajes asíncronos entre servicios | RabbitMQ / Kafka con @RabbitListener / @KafkaListener |
| Evitar deadlocks en JPA | @Lock(PESSIMISTIC_WRITE) + orden consistente |
| Reintento ante fallos concurrentes | @Retryable con OptimisticLockingFailureException |
🔬 Ejercicio Final: API de E-Commerce con concurrencia
// Implementa un endpoint POST /pedidos que:
// 1. Valide stock de cada producto (transacción con bloqueo optimista)
// 2. Llame al servicio de precios en paralelo (WebClient + CompletableFuture)
// 3. Reserve el inventario (transacción propia con @Retryable)
// 4. Publique evento AFTER_COMMIT para notificaciones
// 5. Retorne la respuesta sin esperar las notificaciones
@RestController
@RequestMapping("/api/pedidos")
@RequiredArgsConstructor
public class PedidoController {
private final PedidoOrquestador orquestador;
@PostMapping
public ResponseEntity<PedidoResponse> crear(@RequestBody @Valid CrearPedidoRequest req) {
return ResponseEntity.status(HttpStatus.CREATED)
.body(orquestador.crearPedido(req));
}
}
@Service
@RequiredArgsConstructor
@Transactional
public class PedidoOrquestador {
private final InventarioService inventario;
private final PrecioService precios;
private final PedidoRepository repo;
private final ApplicationEventPublisher events;
public PedidoResponse crearPedido(CrearPedidoRequest req) {
// 1. Precios en paralelo (async, no bloqueante del hilo principal si usamos VT)
Map<Long, BigDecimal> preciosActuales = precios.obtenerPreciosParalelo(
req.items().stream().map(ItemRequest::productoId).toList()
);
// 2. Reservar inventario con bloqueo optimista + retry
req.items().forEach(item ->
inventario.reservar(item.productoId(), item.cantidad())
);
// 3. Calcular total y guardar pedido
BigDecimal total = req.items().stream()
.map(item -> preciosActuales.get(item.productoId())
.multiply(BigDecimal.valueOf(item.cantidad())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
Pedido pedido = new Pedido(req.clienteId(), req.items(), total);
Pedido guardado = repo.save(pedido);
// 4. Evento post-commit (notificaciones no bloquean la respuesta)
events.publishEvent(new PedidoCreado(guardado.getId(), req.email(), total));
return mapper.toResponse(guardado);
// La respuesta se retorna ANTES de que el email se envíe
}
}📚 Recursos recomendados:
- Spring Boot Reference Documentation — docs.spring.io
- Spring in Action — Craig Walls
- Spring Blog: “Virtual Threads in Spring Boot 3.2” — spring.io/blog
- Baeldung: Spring @Async, Spring Transactions, WebFlux Guide
- JEP 444: Virtual Threads — openjdk.org