사용한 node 버젼은 18.15.0 이디용!
import 어쩌구 from '파일명.js' 를 사용하기 위해서
package.json 파일에 "type":"module" 추가 해주삼
서버(e7e/index.js)
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'
const server = createServer((req,res) =>{
return staticHandler(req, res, {public: 'e7e'})
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client Connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadCast(msg)
})
})
function broadCast(msg){
for(const client of wss.clients){
if(client.readyState === WebSocket.OPEN){
console.log("서버:",msg);
client.send(msg.toString('utf-8'));
}
}
}
server.listen(process.argv[2] || 8272)
클라이언트(e7e/index.html)
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>Chat With E7E</title>
</head>
<body>
메세징:
<div id="message"></div>
<form id="msgForm">
<input type="text" id="msgBox" placeholder="메세지 쓰삼">
<button>Send</button>
</form>
<script>
const message = document.querySelector("#message");
const msgForm = document.querySelector("#msgForm");
const msgBox = document.querySelector("#msgBox");
const ws = new WebSocket(`ws://${window.document.location.host}`);
ws.onmessage = (msg)=>{
const msgDiv = document.createElement("div");
console.log(msg);
msgDiv.innerHTML = msg.data;
message.appendChild(msgDiv);
}
msgForm.addEventListener("submit",()=>{
event.preventDefault();
ws.send(msgBox.value);
msgBox.value = "";
})
</script>
</body>
</html>
node index.js
========================>
Broker(Redis로)를 사용 중앙집중식 서로 다른 서버끼리 Pub/Sub(발행/구독) 패턴으로 통신
먼저 redis 서버 설치 및 실행
Window는 공식적으로 지원이 안되서 docker나 wsl2 사용
여기서는 wsl2 사용해 봄
# key 설치
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
# repository 추가
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
# 패키지 업데이트 와 업그레이드
sudo apt update && sudo apt upgrade -y
# 설치
sudo apt install redis
# 시작
sudo service redis-server start
# 접속 확인
redis-cli
127.0.0.1:6379> ping
PONG
docker롱!(위에 했으면 할 필요 업디용)
docker run --name myredis -d -p 6379:6379 redis
docker ps -a ## 상태확인
docker logs myredis ## 문제시 로그 확인
서버(e7e/index2.js)
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'
import Redis from 'ioredis'
const redisSub = new Redis();
const redisPub = new Redis();
const server = createServer((req,res) =>{
return staticHandler(req, res, {public: 'e7e'})
})
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client Connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
//broadCast(msg)
redisPub.publish('chat_message',msg);
})
})
redisSub.subscribe('chat_message')
redisSub.on('message',(channel,msg) => {
console.log("채널확인: ",channel);
for(const client of wss.clients){
if(client.readyState === WebSocket.OPEN){
client.send(msg.toString('utf-8'))
}
}
})
server.listen(process.argv[2] || 8272)
클라이언트(e7e/index.html) 변화 없음
서버 포트 바꿔서 여러개 돌려서 잘 되는 지 테스통!
node index2.js 9004
node index2.js 9014
node index2.js 9024
========================>
zeromq를 사용하여 P2P 방식의 Pub/Sub 패턴으로 전환
서버(e7e/index3.js)
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'
import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers'
import zmq from 'zeromq'
const argv = yargs(hideBin(process.argv)).argv
const server = createServer((req,res) =>{
return staticHandler(req, res, {public: 'e7e'})
})
let pubSocket;
async function initializeSockets(){
pubSocket = new zmq.Publisher();
await pubSocket.bind(`tcp://127.0.0.1:${argv.pub}`);
const subSocket = new zmq.Subscriber()
const subPorts = [].concat(argv.sub)
for(const port of subPorts){
console.log(`Subscribing to ${port}`)
subSocket.connect(`tcp://127.0.0.1:${port}`)
}
subSocket.subscribe('chat')
for await ( const [msg] of subSocket){
console.log(`Message from another server: ${msg}`)
broadCast(msg.toString().split(' ')[1])
}
}
initializeSockets();
const wss = new WebSocketServer({ server })
wss.on('connection', client => {
console.log('Client Connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadCast(msg)
pubSocket.send(`chat ${msg}`)
})
})
function broadCast(msg){
for(const client of wss.clients){
if(client.readyState === WebSocket.OPEN){
console.log("서버:",msg);
client.send(msg.toString('utf-8'));
}
}
}
server.listen(argv.http || 8272)
아래 처럼 서버 여러개 실행 후 테스통( 클라이언트는 동일 (e7e/index.html)
node index3.js --http 8080 --pub 5000 --sub 5001 --sub 5002
node index3.js --http 8081 --pub 5001 --sub 5000 --sub 5002
node index3.js --http 8082 --pub 5002 --sub 5000 --sub 5001
========================>
메세지 Queue 사용도 해 볼까용
rabbitmq 서버 실행 [ docker 설치하공, cmd 창에서 실행!!!]
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
historySVC.js
import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import fs from 'fs'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
import timestamp from 'time-stamp'
async function main(){
console.log("ckkk:",timestamp)
const db = levelup(leveldown('./msgHistory'))
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat','fanout')
const { queue } = channel.assertQueue('chat_history')
await channel.bindQueue(queue, 'chat')
channel.consume(queue, async msg => {
const content = msg.content.toString()
console.log(`Saving message: ${content}`)
await db.put(timestamp(), content)
channel.ack(msg)
})
createServer((req,res) => {
console.log("Server Started")
res.writeHead(200)
console.log("db",db)
// fs.createValueStream()
db.createValueStream()
.pipe(JSONStream.stringify())
.pipe(res)
}).listen(8090)
}
main().catch(err => console.error(err))
amqp 사용
서버(e7e/index4.js)
import { createServer } from 'http'
import levelup from 'levelup'
import leveldown from 'leveldown'
import fs from 'fs'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
import timestamp from 'time-stamp'
async function main(){
console.log("ckkk:",timestamp)
const db = levelup(leveldown('./msgHistory'))
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat','fanout')
const { queue } = channel.assertQueue('chat_history')
await channel.bindQueue(queue, 'chat')
channel.consume(queue, async msg => {
const content = msg.content.toString()
console.log(`Saving message: ${content}`)
await db.put(timestamp(), content)
channel.ack(msg)
})
createServer((req,res) => {
console.log("Server Started")
res.writeHead(200)
console.log("db",db)
// fs.createValueStream()
db.createValueStream()
.pipe(JSONStream.stringify())
.pipe(res)
}).listen(8090)
}
main().catch(err => console.error(err))
node index4.js 9010
node index4.js 9020
node historySVC.js
========================>
Stream 이용
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import WebSocket, { WebSocketServer } from 'ws'
import Redis from 'ioredis'
const redisClient = new Redis();
const redisClientXread = new Redis();
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'e7e' })
})
const wss = new WebSocketServer({ server })
wss.on('connection', async client => {
console.log('Client Connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisClient.xadd('chat_stream', '*', 'message', msg)
})
const logs = await redisClient.xrange('chat_stream', '-', '+')
console.log("check: ", logs);
for (const [, [, message]] of logs) {
client.send(message)
}
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === WebSocket.OPEN) {
console.log("서버:", msg);
client.send(msg.toString('utf-8'));
}
}
}
let lastRecordId = '$'
async function processStreamMessage() {
while (true) {
const [[, records]] = await redisClientXread.xread('BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
for (const [recordId, [, message]] of records) {
console.log(`Message from stream: ${message}`)
broadcast(message)
lastRecordId = recordId
}
}
}
processStreamMessage().catch(err => console.error(err))
server.listen(process.argv[2] || 8272)
확인
docker run --name myredis -d -p 6379:6379 redis ## 실행이 안되어 있다면
node index5.js 9004
node index5.js 9104
======================>
작업배포 패턴
generateTask.js
export function* generateTasks(searchHash,alphabet,maxWordLength,batchSize){
let nVariations = 0
for(let n = 1; n <= maxWordLength; n++){
nVariations += Math.pow(alphabet.length, n)
}
console.log(`Finding the hashsum source string over ${nVariations} possible variations`)
let batchStart = 1
while(batchStart <= nVariations){
const batchEnd = Math.min(batchStart + batchSize -1, nVariations)
yield {
searchHash,
alphabet: alphabet, // 왜? 이것만?
batchStart,
batchEnd
}
batchStart = batchEnd + 1
}
}
producer.js
import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTask.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [,,maxLength,searchHash] = process.argv
async function main(){
const ventilator = new zmq.Push()
await ventilator.bind('tcp://*:5016')
await delay(1000)
const generatorObj = generateTasks(searchHash,ALPHABET,maxLength,BATCH_SIZE)
console.log("check: ", generatorObj)
for(const task of generatorObj){
await ventilator.send(JSON.stringify(task))
}
}
main().catch(err => console.error(err))
processTask.js
import isv from 'indexed-string-variation'
import { createHash } from 'crypto'
export function processTask(task){
const variationGen = isv.generator(task.alphabet)
console.log(`Processing from
${variationGen(task.batchStart)} (${task.batchStart})
to ${variationGen(task.batchEnd)} (${task.batchEnd})
`)
for(let idx = task.batchStart; idx <= task.batchEnd; idx++){
const word = variationGen(idx)
const shasum = createHash('sha1')
shasum.update(word)
const digest = shasum.digest('hex')
if(digest === task.searchHash){
return word
}
}
}
worker.js
import zmq from 'zeromq'
import { processTask } from './processTask.js'
async function main(){
const fromVentilator = new zmq.Pull()
const toSink = new zmq.Push()
fromVentilator.connect('tcp://localhost:5016')
toSink.connect('tcp://localhost:5017')
for await (const rawMessage of fromVentilator){
const found = processTask(JSON.parse(rawMessage.toString()))
if(found){
console.log(`Found! => ${found}`)
await toSink.send(`Found: ${found}`)
}
}
}
main().catch(err => console.error(err))
collector.js
import zmq from 'zeromq'
async function main(){
const sink = new zmq.Pull()
await sink.bind('tcp://*:5017')
for await (const rawMessage of sink){
console.log(`Message from worker: `, rawMessage.toString())
}
}
main().catch(err => console.error(err))
화긴!!!
node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
Restful용 가짜 data 서비스 json-server 맹글기 (0) | 2024.05.21 |
---|---|
Https 서버 맹글기 (0) | 2023.06.05 |