스프링 임베디드 Kafka를 사용한 @Kafka Listener 테스트
Spring Boot 2.x를 사용하여 개발 중인 Kafka 청취자용 유닛 테스트를 작성하려고 합니다. 유닛 테스트이기 때문에 Zookeeper의 인스턴스인 풀 Kafka 서버를 시작하고 싶지 않습니다.그래서 Spring Embedded Kafka를 사용하기로 했습니다.
내 청취자의 정의는 매우 기본적이다.
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
또한 테스트를 통해 이 테스트에 의해latch
메시지 수신 후 0이 되는 카운터는 매우 쉽습니다.
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
아쉽게도 시험에 떨어졌는데 왜 그런지 모르겠어요.의 인스턴스를 사용할 수 있습니까?KafkaEmbedded
주석으로 표시한 방법을 시험하다@KafkaListener
?
모든 코드는 GitHub 저장소 kafka-listener에서 공유됩니다.
여러분 덕분이에요.
사용자에게 주제/파티션이 할당되기 전에 메시지를 보내고 있을 수 있습니다.속성 설정...
spring:
kafka:
consumer:
auto-offset-reset: earliest
...기본값은 입니다.latest
.
이건 마치--from-beginning
콘솔 컨슈머에 접속합니다.
편집
오; 부트 속성을 사용하지 않는군요.
더하다
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
편집 2
그나저나, 너도 한번 해봐.get(10L, TimeUnit.SECONDS)
의 결과에 따라template.send()
(a)Future<>
송신에 성공했다고 단언합니다.
편집 3
테스트에 대해서만 오프셋 재설정을 재정의하려면 브로커 주소에 대해 수행한 것과 동일한 작업을 수행할 수 있습니다.
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
그리고.
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
단, 이 속성은 그룹이 처음 소비할 때만 적용됩니다.앱을 시작할 때마다 항상 마지막에 시작하려면 시작 시 끝까지 탐색해야 합니다.
또, 세팅하는 것을 추천합니다.enable.auto.commit
로.false
컨테이너가 시간 스케줄에 따라 수행하는 소비자 클라이언트에 의존하지 않고 오프셋을 커밋할 수 있도록 합니다.
아마 누군가가 이것을 유용하게 여길 것이다.저도 비슷한 문제가 있었어요.로컬로 테스트가 실행 중(일부 체크가 실행됨)Awaitility.waitAtMost
Jenkins 파이프라인에서는 테스트가 실패했습니다.
가장 많이 투표된 답변에서 이미 언급된 바와 같이 솔루션은auto-offset-reset=earliest
테스트 실행 중에는 테스트 로그를 조사하여 설정이 올바른지 여부를 확인할 수 있습니다.생산자와 소비자 모두를 위한 스프링 출력 구성
언급URL : https://stackoverflow.com/questions/50123621/testing-a-kafkalistener-using-spring-embedded-kafka
'programing' 카테고리의 다른 글
React에서 상태 비저장 구성 요소의 참조에 연결하는 방법은 무엇입니까? (0) | 2023.03.18 |
---|---|
Spring REST 서비스: json 응답의 null 개체를 제거하도록 구성하는 방법 (0) | 2023.03.18 |
리액트 훅을 사용한 상태 갱신 시 비동기 코드 실행 (0) | 2023.03.18 |
참조 오류:Jest 환경이 해체된 후 파일을 '가져오기'하려고 합니다. (0) | 2023.03.18 |
Facebook의 앱 내 브라우저가 "net::ERR_FAILED"이지만 다른 브라우저는 없습니다. (0) | 2023.03.18 |