상세 컨텐츠

본문 제목

Node로 메세징 개념 잡깅

NodeJS

by e7e 2023. 4. 3. 14:44

본문

사용한 node 버젼은 18.15.0  이디용!

 

import  어쩌구 from '파일명.js'  를 사용하기 위해서

 

package.json 파일에 "type":"module" 추가 해주삼

 

서버(e7e/index.js)

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'

const server = createServer((req,res) =>{
    return staticHandler(req, res, {public: 'e7e'})
})

const wss = new WebSocketServer({ server })

wss.on('connection', client => {
    console.log('Client Connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        broadCast(msg)
    })
})

function broadCast(msg){
    for(const client of wss.clients){
        if(client.readyState === WebSocket.OPEN){
            console.log("서버:",msg);
            client.send(msg.toString('utf-8'));
        }
    }
}


server.listen(process.argv[2] || 8272)

클라이언트(e7e/index.html)

<!DOCTYPE html>
<html lang="ko">
<head>
    <meta charset="UTF-8">
    <title>Chat With E7E</title>
</head>
<body>
    메세징:
    <div id="message"></div>
    <form id="msgForm">
        <input type="text" id="msgBox" placeholder="메세지 쓰삼">
        <button>Send</button>
    </form>
<script>
    const message = document.querySelector("#message");
    const msgForm = document.querySelector("#msgForm");
    const msgBox = document.querySelector("#msgBox");

    const ws = new WebSocket(`ws://${window.document.location.host}`);

    ws.onmessage = (msg)=>{
        const msgDiv = document.createElement("div");
        console.log(msg);
        msgDiv.innerHTML = msg.data;
        message.appendChild(msgDiv);
    }

    msgForm.addEventListener("submit",()=>{
        event.preventDefault();
        ws.send(msgBox.value);
        msgBox.value = "";
    })
</script>
</body>
</html>
node index.js

 

========================>

Broker(Redis로)를 사용 중앙집중식 서로 다른 서버끼리 Pub/Sub(발행/구독) 패턴으로 통신

먼저 redis 서버 설치 및 실행

Window는 공식적으로 지원이 안되서 docker나 wsl2 사용

여기서는 wsl2 사용해 봄

# key 설치
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg

# repository 추가
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list

# 패키지 업데이트 와 업그레이드
sudo apt update && sudo apt upgrade -y

# 설치
sudo apt install redis

# 시작
sudo service redis-server start

# 접속 확인
redis-cli 
127.0.0.1:6379> ping
PONG

docker롱!(위에 했으면 할 필요 업디용)

docker run --name myredis -d -p 6379:6379 redis
docker ps -a ## 상태확인
docker logs myredis ## 문제시 로그 확인

 

서버(e7e/index2.js)

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'
import Redis from 'ioredis'

const redisSub = new Redis();
const redisPub = new Redis();

const server = createServer((req,res) =>{
    return staticHandler(req, res, {public: 'e7e'})
})

const wss = new WebSocketServer({ server })

wss.on('connection', client => {
    console.log('Client Connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        //broadCast(msg)
        redisPub.publish('chat_message',msg);
    })
})

redisSub.subscribe('chat_message')
redisSub.on('message',(channel,msg) => {
    console.log("채널확인: ",channel);
    for(const client of wss.clients){
        if(client.readyState === WebSocket.OPEN){
            client.send(msg.toString('utf-8'))
        }
    }
})


server.listen(process.argv[2] || 8272)

클라이언트(e7e/index.html)  변화 없음

서버 포트 바꿔서 여러개 돌려서 잘 되는 지 테스통!

node index2.js 9004

node index2.js 9014

node index2.js 9024

 

 

========================>

zeromq를 사용하여 P2P 방식의 Pub/Sub 패턴으로 전환

서버(e7e/index3.js)

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'
import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers'
import zmq from 'zeromq'

const argv = yargs(hideBin(process.argv)).argv

const server = createServer((req,res) =>{
    return staticHandler(req, res, {public: 'e7e'})
})

let pubSocket;
async function initializeSockets(){
    pubSocket = new zmq.Publisher();
    await pubSocket.bind(`tcp://127.0.0.1:${argv.pub}`);

    const subSocket = new zmq.Subscriber()
    const subPorts = [].concat(argv.sub)
    for(const port of subPorts){
        console.log(`Subscribing to ${port}`)
        subSocket.connect(`tcp://127.0.0.1:${port}`)
    }
    subSocket.subscribe('chat')

    for await ( const [msg] of subSocket){
        console.log(`Message from another server: ${msg}`)
        broadCast(msg.toString().split(' ')[1])
    }
}

initializeSockets();

const wss = new WebSocketServer({ server })

wss.on('connection', client => {
    console.log('Client Connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        broadCast(msg)
        pubSocket.send(`chat ${msg}`)
    })
})


function broadCast(msg){
    for(const client of wss.clients){
        if(client.readyState === WebSocket.OPEN){
            console.log("서버:",msg);
            client.send(msg.toString('utf-8'));
        }
    }
}

server.listen(argv.http || 8272)

아래 처럼 서버 여러개 실행 후 테스통( 클라이언트는 동일 (e7e/index.html)

node index3.js --http 8080 --pub 5000 --sub 5001 --sub 5002
node index3.js --http 8081 --pub 5001 --sub 5000 --sub 5002
node index3.js --http 8082 --pub 5002 --sub 5000 --sub 5001

 

========================>

메세지 Queue 사용도 해 볼까용

rabbitmq 서버 실행 [ docker 설치하공, cmd 창에서 실행!!!]

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

 

historySVC.js

import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import  fs from 'fs'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
import timestamp from 'time-stamp'

async function main(){

    console.log("ckkk:",timestamp)
    const db = levelup(leveldown('./msgHistory'))

    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    await channel.assertExchange('chat','fanout')
    const { queue } = channel.assertQueue('chat_history')
    await channel.bindQueue(queue, 'chat')

    channel.consume(queue, async msg => {
        const content = msg.content.toString()
        console.log(`Saving message: ${content}`)
        await db.put(timestamp(), content)
        channel.ack(msg)
    })

    createServer((req,res) => {
        console.log("Server Started")
        res.writeHead(200)
        console.log("db",db)
       // fs.createValueStream()
       db.createValueStream()
        .pipe(JSONStream.stringify())
        .pipe(res)
    }).listen(8090)
}

main().catch(err => console.error(err))

amqp 사용

서버(e7e/index4.js)

import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import  fs from 'fs'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
import timestamp from 'time-stamp'

async function main(){

    console.log("ckkk:",timestamp)
    const db = levelup(leveldown('./msgHistory'))

    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    await channel.assertExchange('chat','fanout')
    const { queue } = channel.assertQueue('chat_history')
    await channel.bindQueue(queue, 'chat')

    channel.consume(queue, async msg => {
        const content = msg.content.toString()
        console.log(`Saving message: ${content}`)
        await db.put(timestamp(), content)
        channel.ack(msg)
    })

    createServer((req,res) => {
        console.log("Server Started")
        res.writeHead(200)
        console.log("db",db)
       // fs.createValueStream()
       db.createValueStream()
        .pipe(JSONStream.stringify())
        .pipe(res)
    }).listen(8090)
}

main().catch(err => console.error(err))
node index4.js 9010

node index4.js 9020

node historySVC.js

 

========================>

Stream 이용

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'
import Redis from 'ioredis'

const redisClient = new Redis();
const redisClientXread = new Redis();

const server = createServer((req, res) => {
    return staticHandler(req, res, { public: 'e7e' })
})

const wss = new WebSocketServer({ server })

wss.on('connection', async client => {
    console.log('Client Connected')
    client.on('message', msg => {
        console.log(`Message: ${msg}`)
        redisClient.xadd('chat_stream', '*', 'message', msg)
    })
    
    const logs = await redisClient.xrange('chat_stream', '-', '+')
    console.log("check: ", logs);

    for (const [, [, message]] of logs) {
        client.send(message)
    }
})


function broadcast(msg) {
    for (const client of wss.clients) {
        if (client.readyState === WebSocket.OPEN) {
            console.log("서버:", msg);
            client.send(msg.toString('utf-8'));
        }
    }
}

let lastRecordId = '$'

async function processStreamMessage() {
    while (true) {
        const [[, records]] = await redisClientXread.xread('BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
        for (const [recordId, [, message]] of records) {
            console.log(`Message from stream: ${message}`)
            broadcast(message)
            lastRecordId = recordId
        }
    }
}

processStreamMessage().catch(err => console.error(err))

server.listen(process.argv[2] || 8272)

확인

docker run --name myredis -d -p 6379:6379 redis  ## 실행이 안되어 있다면

node index5.js 9004

node index5.js 9104

 

 

======================>

작업배포 패턴

generateTask.js

export function* generateTasks(searchHash,alphabet,maxWordLength,batchSize){
    let nVariations = 0
    
    for(let n = 1; n <= maxWordLength; n++){
        nVariations += Math.pow(alphabet.length, n)
    }

    console.log(`Finding the hashsum source string over ${nVariations} possible variations`)

    let batchStart = 1
    while(batchStart <= nVariations){
        const batchEnd = Math.min(batchStart + batchSize -1, nVariations)

        yield {
            searchHash,
            alphabet: alphabet,  // 왜? 이것만?
            batchStart,
            batchEnd
        }

        batchStart = batchEnd + 1
    }

}

producer.js

import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTask.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [,,maxLength,searchHash] = process.argv

async function main(){
    const ventilator = new zmq.Push()
    await ventilator.bind('tcp://*:5016')
    await delay(1000)

    const generatorObj = generateTasks(searchHash,ALPHABET,maxLength,BATCH_SIZE)
    console.log("check: ", generatorObj)

    for(const task of generatorObj){
        await ventilator.send(JSON.stringify(task))
    }
}

main().catch(err => console.error(err))

processTask.js

import isv from 'indexed-string-variation'
import { createHash } from 'crypto'

export function processTask(task){
    const variationGen = isv.generator(task.alphabet)
    console.log(`Processing from  
          ${variationGen(task.batchStart)} (${task.batchStart})
          to ${variationGen(task.batchEnd)} (${task.batchEnd})
    `)

    for(let idx = task.batchStart; idx <= task.batchEnd; idx++){
        const word = variationGen(idx)
        const shasum = createHash('sha1')
        shasum.update(word)
        const digest = shasum.digest('hex')

        if(digest === task.searchHash){
            return word
        }
    }
}

worker.js

import zmq from 'zeromq'
import { processTask } from './processTask.js'

async function main(){
    const fromVentilator = new zmq.Pull()
    const toSink = new zmq.Push()

    fromVentilator.connect('tcp://localhost:5016')
    toSink.connect('tcp://localhost:5017')

    for await (const rawMessage of fromVentilator){
        const found = processTask(JSON.parse(rawMessage.toString()))
        if(found){
            console.log(`Found! => ${found}`)
            await toSink.send(`Found: ${found}`)
        }
    }
}

main().catch(err => console.error(err))

collector.js

import zmq from 'zeromq'

async function main(){
    const sink = new zmq.Pull()
    await sink.bind('tcp://*:5017')

    for await (const rawMessage of sink){
        console.log(`Message from worker: `, rawMessage.toString())
    }
}

main().catch(err => console.error(err))

화긴!!!

node worker.js

node worker.js

node collector.js

node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

 

'NodeJS' 카테고리의 다른 글

관련글 더보기