개발일지
Nest.js kafka
index.ys
2023. 12. 3. 12:09
코드 구조
api-gateway
- 3000번 포트로 들어온 요청 9092포트로 분산처리
posts.controller.ts
- transport: Transport.KAFKA : 카프카 클라이언트 설정, 옵션 지정
- onModuleInit() : 애플리케이션이 초기화 될떄 실행되는 메서드, kafka 주제인 add.new.post get.posts.list에 대한 응답을 구독하고 클라이언트를 연결함
- post : 받아와서 /posts/' 엔드포인트에서 데이터를 받아와서 'add.new.post' 주제로 Kafka에 메시지를 전송
- get : GET 요청에 대한 핸들러로, '/posts/' 엔드포인트에서 'get.posts.list' 주제로 Kafka에 메시지를 전송
import {Body, Controller, Get, Post} from '@nestjs/common';
import {Client, ClientKafka, Transport} from "@nestjs/microservices";
import {IPost} from "./interfaces/post.interface";
@Controller('posts')
export class PostsController {
//카프카 클라이언트 설정
@Client({
//프로토콜을 통해 통신
transport: Transport.KAFKA,
//
options: {
client: {
clientId: 'posts',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'posts-consumer'
}
}
})
client: ClientKafka;
//애플리케이션이 초기화 될떄 호출됨
//kafka클라이언트를 초기화하고 특정 주제에 대한 응답을 구독
async onModuleInit() {
this.client.subscribeToResponseOf('add.new.post');
this.client.subscribeToResponseOf('get.posts.list');
await this.client.connect();
}
//POST요청처리
//addPost 메서드정의, add.new.post주제로 데이터를 전송함
@Post('/')
appPost(@Body() post: IPost) {
//게시물에 대한 데이터 정의
// export interface IPost {
// title: string;
// description: string;
// }
console.log(post)
return this.client.send('add.new.post', post);
}
//get.posts.list로 데이터를 전송함
@Get('/')
getList() {
return this.client.send('get.posts.list', '');
}
}
posts-service
posts.controller.ts
- MessagePattern('get.posts.list') : 메시지 패턴 데코레이터, get.posts.list라는 주제의 메세지를 처리하는 핸들러
- MessagePattern('add.new.post') : 메시지 패턴 데코레이터, Kafka 주제에 대한 메시지를 처리하는 핸들러
- 메세지로 전달된 message인자의 value를 추출해 postsService에서 제공하는 addPosts 메서드로 전달
import { Controller } from '@nestjs/common';
import {PostsService} from "./posts.service";
import {MessagePattern, Payload} from "@nestjs/microservices";
import {IKafkaMessage} from "../interfaces/kafka-message.interface";
import {IPost} from "./interfaces/post.interface";
@Controller()
export class PostsController {
constructor(private postsService: PostsService) {}
@MessagePattern('get.posts.list')
getPosts() {
return this.postsService.getList();
}
@MessagePattern('add.new.post')
addPost(@Payload() message: IKafkaMessage<IPost>) {
return this.postsService.addPost(message.value);
}
}
posts.service.ts
- addPosts: 새로운 포스트를 배열에 추가하고 추가된 포스트를 반환함
- getLists: 현재까지 추가된 모든 포스트를 반환함, posts 배열자체를 반환함
import { Injectable } from '@nestjs/common';
import {IPost} from "./interfaces/post.interface";
@Injectable()
export class PostsService {
//배열선언
//export interface IPost ={
//title: string;
//decription: string;
//}
posts: Array<IPost>;
//ex) [title, description]
constructor() {
this.posts = [];
}
addPost(post: IPost): IPost {
this.posts.push(post);
return this.posts[this.posts.length - 1];
}
getList(): Array<IPost> {
return this.posts;
}
}
도커 컴포즈 실행
- 백그라운드에서 컴포즈 파일 실행
docker-compose up -d
- 실행된 컨테이너
근데 왜 요청이 안가냐...?
- 서버에 로그도 안뜸...대답없는너...

참고 깃허브
https://github.com/js-code-ua/nest-kafka-example
GitHub - js-code-ua/nest-kafka-example: youtube video project
youtube video project. Contribute to js-code-ua/nest-kafka-example development by creating an account on GitHub.
github.com