programing

스프링 임베디드 Kafka를 사용한 @Kafka Listener 테스트

codeshow 2023. 3. 18. 09:12
반응형

스프링 임베디드 Kafka를 사용한 @Kafka Listener 테스트

Spring Boot 2.x를 사용하여 개발 중인 Kafka 청취자용 유닛 테스트를 작성하려고 합니다. 유닛 테스트이기 때문에 Zookeeper의 인스턴스인 풀 Kafka 서버를 시작하고 싶지 않습니다.그래서 Spring Embedded Kafka를 사용하기로 했습니다.

내 청취자의 정의는 매우 기본적이다.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

또한 테스트를 통해 이 테스트에 의해latch메시지 수신 후 0이 되는 카운터는 매우 쉽습니다.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

아쉽게도 시험에 떨어졌는데 왜 그런지 모르겠어요.의 인스턴스를 사용할 수 있습니까?KafkaEmbedded주석으로 표시한 방법을 시험하다@KafkaListener?

모든 코드는 GitHub 저장소 kafka-listener에서 공유됩니다.

여러분 덕분이에요.

사용자에게 주제/파티션이 할당되기 전에 메시지를 보내고 있을 수 있습니다.속성 설정...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...기본값은 입니다.latest.

이건 마치--from-beginning콘솔 컨슈머에 접속합니다.

편집

오; 부트 속성을 사용하지 않는군요.

더하다

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

편집 2

그나저나, 너도 한번 해봐.get(10L, TimeUnit.SECONDS)의 결과에 따라template.send()(a)Future<>송신에 성공했다고 단언합니다.

편집 3

테스트에 대해서만 오프셋 재설정을 재정의하려면 브로커 주소에 대해 수행한 것과 동일한 작업을 수행할 수 있습니다.

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

그리고.

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

단, 이 속성은 그룹이 처음 소비할 때만 적용됩니다.앱을 시작할 때마다 항상 마지막에 시작하려면 시작 시 끝까지 탐색해야 합니다.

또, 세팅하는 것을 추천합니다.enable.auto.commit로.false컨테이너가 시간 스케줄에 따라 수행하는 소비자 클라이언트에 의존하지 않고 오프셋을 커밋할 수 있도록 합니다.

아마 누군가가 이것을 유용하게 여길 것이다.저도 비슷한 문제가 있었어요.로컬로 테스트가 실행 중(일부 체크가 실행됨)Awaitility.waitAtMostJenkins 파이프라인에서는 테스트가 실패했습니다.

가장 많이 투표된 답변에서 이미 언급된 바와 같이 솔루션은auto-offset-reset=earliest테스트 실행 중에는 테스트 로그를 조사하여 설정이 올바른지 여부를 확인할 수 있습니다.생산자와 소비자 모두를 위한 스프링 출력 구성

언급URL : https://stackoverflow.com/questions/50123621/testing-a-kafkalistener-using-spring-embedded-kafka

반응형