개발일지

Nest.js kafka

index.ys 2023. 12. 3. 12:09

코드 구조

api-gateway

  • 3000번 포트로 들어온 요청 9092포트로 분산처리

posts.controller.ts

  • transport: Transport.KAFKA : 카프카 클라이언트 설정, 옵션 지정
  • onModuleInit() : 애플리케이션이 초기화 될떄 실행되는 메서드, kafka 주제인 add.new.post    get.posts.list에 대한 응답을 구독하고 클라이언트를 연결함
  • post : 받아와서 /posts/' 엔드포인트에서 데이터를 받아와서 'add.new.post' 주제로 Kafka에 메시지를 전송
  • get : GET 요청에 대한 핸들러로, '/posts/' 엔드포인트에서 'get.posts.list' 주제로 Kafka에 메시지를 전송
import {Body, Controller, Get, Post} from '@nestjs/common';
import {Client, ClientKafka, Transport} from "@nestjs/microservices";
import {IPost} from "./interfaces/post.interface";

@Controller('posts')
export class PostsController {
    //카프카 클라이언트 설정
    @Client({
        //프로토콜을 통해 통신
        transport: Transport.KAFKA,
        //
        options: {
            client: {
                clientId: 'posts',
                brokers: ['localhost:9092'],
            },
            consumer: {
                groupId: 'posts-consumer'
            }
        }
    })
    client: ClientKafka;

    //애플리케이션이 초기화 될떄 호출됨
    //kafka클라이언트를 초기화하고 특정 주제에 대한 응답을 구독
    async onModuleInit() {
        this.client.subscribeToResponseOf('add.new.post');
        this.client.subscribeToResponseOf('get.posts.list');

        await this.client.connect();
    }

    //POST요청처리
    //addPost 메서드정의, add.new.post주제로 데이터를 전송함
    @Post('/')
    appPost(@Body() post: IPost) {
        //게시물에 대한 데이터 정의
        // export interface IPost {
        //     title: string;
        //     description: string;
        // }
        console.log(post)
        return this.client.send('add.new.post', post);
    }

    //get.posts.list로 데이터를 전송함
    @Get('/')
    getList() {
        return this.client.send('get.posts.list', '');
    }
}

posts-service

posts.controller.ts

  • MessagePattern('get.posts.list') : 메시지 패턴 데코레이터, get.posts.list라는 주제의 메세지를 처리하는 핸들러
  • MessagePattern('add.new.post') : 메시지 패턴 데코레이터, Kafka 주제에 대한 메시지를 처리하는 핸들러
    • 메세지로 전달된 message인자의 value를 추출해 postsService에서 제공하는 addPosts 메서드로 전달
import { Controller } from '@nestjs/common';
import {PostsService} from "./posts.service";
import {MessagePattern, Payload} from "@nestjs/microservices";
import {IKafkaMessage} from "../interfaces/kafka-message.interface";
import {IPost} from "./interfaces/post.interface";

@Controller()
export class PostsController {
    constructor(private postsService: PostsService) {}

    @MessagePattern('get.posts.list')
    getPosts() {
        return this.postsService.getList();
    }

    @MessagePattern('add.new.post')
    addPost(@Payload() message: IKafkaMessage<IPost>) {
        return this.postsService.addPost(message.value);
    }
}

posts.service.ts

  • addPosts: 새로운 포스트를 배열에 추가하고 추가된 포스트를 반환함
  • getLists: 현재까지 추가된 모든 포스트를 반환함, posts 배열자체를 반환함
import { Injectable } from '@nestjs/common';
import {IPost} from "./interfaces/post.interface";

@Injectable()
export class PostsService {
	//배열선언
    //export interface IPost ={
		//title: string;
        //decription: string;
	//}
    posts: Array<IPost>;
    //ex) [title, description]

    constructor() {
        this.posts = [];
    }

    addPost(post: IPost): IPost {
        this.posts.push(post);
        return this.posts[this.posts.length - 1];
    }

    getList(): Array<IPost> {
        return this.posts;
    }
}

도커 컴포즈 실행

  • 백그라운드에서 컴포즈 파일 실행
docker-compose up -d

  • 실행된 컨테이너

근데 왜 요청이 안가냐...?

  • 서버에 로그도 안뜸...대답없는너...

참고 깃허브

https://github.com/js-code-ua/nest-kafka-example

 

GitHub - js-code-ua/nest-kafka-example: youtube video project

youtube video project. Contribute to js-code-ua/nest-kafka-example development by creating an account on GitHub.

github.com