티스토리 뷰

반응형

AWS SQS FIFO란?

메시지가 전송된 순서대로 처리되는 방식의 SQS 대기열로,

먼저 전송된 메시지가 먼저 소비됨 메시지 순서가 중요한 애플리케이션에서 유용함

 

특징

  • 선입선출 보장 - 메시지가 전송된 순서대로 처리
  • 중복 제거 - 중복 메시지는 제거
  • 메시지 그룹 ID - 여러 생산자 간의 메시지 순서를 유지하기 위해 그룹 ID를 유지
  • 처리량 - 일부 AWS 리전에서는 API 작업당 70,000TPS(초당 트랜잭션)까지 처리량이 증가할 수 있습니다.
  • DLQ 지원 - Dead Letter Queue (DLQ)에 대한 Redrive 지원을 통해 실패한 메시지를 처리 가능

SQS 만들기

 

SqsConfig

package com.ssu.ssuketing.common.config;

import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

import java.time.Duration;


@Configuration
public class SqsConfig {
    private static final Logger log = LoggerFactory.getLogger(SqsConfig.class);
    @Value("${spring.cloud.aws.credentials.access-key}")
    private String accessKey;

    @Value("${spring.cloud.aws.credentials.secret-key}")
    private String secretKey;

    @Value("${spring.cloud.aws.region.static}")
    private String region;

    // SQS Client
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .region(Region.of(region))
                .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey,secretKey)))
                .build();
    }

    // Listener Factory 설정
    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsMessageListenerContainerFactory(){
        return SqsMessageListenerContainerFactory.
                builder()
                .configure(options -> {
                    options.acknowledgementMode(AcknowledgementMode.ALWAYS);
                    options.pollTimeout(Duration.ofSeconds(1));
                    options.maxMessagesPerPoll(10);
                    log.info("AcknowledgementMode has been set to: ALWAYS");
                })
                .sqsAsyncClient(sqsAsyncClient())
                .build();
    }

    // SQS 템플릿
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.newTemplate(sqsAsyncClient());
    }
}

 

SqsMessageSender

package com.ssu.ssuketing.common.sqs;

import io.awspring.cloud.sqs.operations.SendResult;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

@Component
public class SqsMessageSender {

    private final SqsTemplate template;

    public SqsMessageSender(SqsAsyncClient sqsAsyncClient) {
        this.template = SqsTemplate.newTemplate(sqsAsyncClient);
    }

    public SendResult<String> sendReservationMessage(String queueUrl, String groupId, String messageDeduplicationId, String message) {
        return template.send(to -> to
                .queue(queueUrl)
                .messageGroupId(groupId)
                .messageDeduplicationId(messageDeduplicationId)
                .payload(message));
    }

}

 

SqsMessageListener

package com.ssu.ssuketing.common.sqs;

import com.ssu.ssuketing.service.ReservationService;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@RequiredArgsConstructor
@Slf4j
public class SqsMessageListener {

    private final ReservationService reservationService;

    @SqsListener(value = "${spring.cloud.aws.sqs.queue-name}", factory = "defaultSqsMessageListenerContainerFactory")
    void handleReservationQueueMessage(String payload) {
        try {
            // 메시지 처리 시도
            System.out.println("리스너 진입 시간: "+ LocalDateTime.now());
            reservationService.processReservationFromQueue(payload);
            log.info("메시지 처리 성공" + LocalDateTime.now());

        } catch (Exception e) {
            log.error("메시지 처리 중 오류 발생: {}", e.getMessage(), e);
        }
    }

}

 

SqsProperties

package com.ssu.ssuketing.common.sqs;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "aws.sqs")
public class SqsProperties {

    private String queueName;

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
}

 

processReservationFromQueue

public void processReservationFromQueue(String message) {
      SqsReservationListenerDto sqsReservationListenerDto;
      try {
          sqsReservationListenerDto = objectMapper.readValue(message, SqsReservationListenerDto.class);
      } catch (JsonProcessingException e) {
          throw new RuntimeException("Internal Server Error");
      }

      // 정보 가져오기
      Ticket ticket = sqsReservationListenerDto.getTicket();
      Reservation reservation = sqsReservationListenerDto.getReservation();

      // 여석 검사
      int reservationCount = Math.toIntExact(reservationRepository.findCountReservationByTicket(ticket, ReservationStatus.CONFIRMED));
      if (reservationCount >= ticket.getCapacity()) {
          reservation.setStatus(ReservationStatus.REJECT);
          reservationRepository.save(reservation);
//            reservationRepository.updateStatusOnlyById(reservation.getId(), ReservationStatus.REJECT);
          throw new FullReservationException(ReservationErrorMessage.RESERVATION_FULL);
      }

      reservation.setStatus(ReservationStatus.CONFIRMED);
      reservationRepository.save(reservation);
//        reservationRepository.updateStatusAndConfirmedAtById(reservation.getId(),ReservationStatus.CONFIRMED, LocalDateTime.now());
  }

 

Application.yml

# AWS
cloud:
  aws:
    region:
      static: ap-northeast-2
    credentials:
      secret-key: EdX5n/s3hjvSg5AcCP4r1iOtk0GunH/Mi0uzkEU0
      access-key: AKIA45Y2RWENR4RMT5V5
    sqs:
      endpoint: https://sqs.ap-northeast-2.amazonaws.com/888577044763/Reservation.fifo
      queue-name : Reservation.fifo
반응형
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/05   »
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30 31
글 보관함