티스토리 뷰

반응형

먼저 설치

pip install locust, websockets

파일 작성(locustfile.py)

  • 나는 인증을 헤더로 하게 해놔서 헤더 추가함
import time
import json
import asyncio
from locust import User, task, between
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP
from locust.exception import LocustError
import websockets


class WebSocketClient:
    def __init__(self, url, headers):
        self.url = url
        self.headers = headers
        self.connection = None

    async def connect(self):
        self.connection = await websockets.connect(self.url, extra_headers=self.headers)

    async def send_message(self, message):
        await self.connection.send(message)

    async def receive_message(self):
        return await self.connection.recv()

    async def close(self):
        if self.connection:
            await self.connection.close()


class WebSocketUser(User):
    wait_time = between(1, 3)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = None

    async def on_start(self):
        headers = {
            "key": "jwt? or remove header",
        }
        self.client = WebSocketClient("wss://your-host", headers)
        await self.client.connect()

    async def on_stop(self):
        await self.client.close()

    @task
    async def websocket_task(self):
        try:
            #여기에 작업
            pass
        except Exception as e:
            print(f"Error during WebSocket communication: {e}")

    def run(self):
        asyncio.run(self.start_test())

    async def start_test(self):
        try:
            await self.on_start()
            while self.environment.runner.state not in (
                STATE_STOPPING,
                STATE_STOPPED,
                STATE_CLEANUP,
            ):
                await self.websocket_task()
                await asyncio.sleep(self.wait_time())
        except LocustError as e:
            print(f"Locust error: {e}")
        finally:
            await self.on_stop()

locust 서버 실행

locust -f locustfile.py

localhost:8089

일단 간단하게 연결 되는지 확인했고, 추가로 작성 예정임

 

테스트 코드

# locustfile.py

import sys
import os
import json
import time
import random
import string
from datetime import datetime
from locust import User, task, between
import websocket
from sqlalchemy.orm import sessionmaker

sys.path.append(
    os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "ws_server"))
)

# 필요한 모듈 가져오기
from common.model_server import HHHHH
from common.utils import generate_credentials
from common.db_init import server_engine


class WebSocketUser(User):
    wait_time = between(1, 5)
    uri = "ws://127.0.0.1:6410/ws"
  	key = ""
    key_ex = ""

    def create_hospital(self):
        Session = sessionmaker(bind=server_engine)
        db = Session()
        try:
            self.a, self.s = generate_credentials()
            self.b = "".join(random.choices(string.digits, k=10))
            hospital = HHHHH(
                name="locust",
                h1=self.b,
                key=self.a,
                key2=self.s,
            )
            db.add(hospital)
            db.commit()
        finally:
            db.close()

    def on_start(self):
        self.create_hospital()

        headers = {
            "key": f"{self.a}",
            "key2": f"{self.s}",
            "h1": f"{self.b}",
        }

        header_list = [f"{k}: {v}" for k, v in headers.items()]

        websocket.enableTrace(False)  # 필요에 따라 True로 설정하여 디버깅 정보 출력

        try:
            self.ws = websocket.WebSocket()
            self.ws.connect(self.uri, header=header_list, timeout=10)
            self.receive_token()
        except Exception as e:
            print(f"Error during connection: {e}")
            self.ws = None

    def on_stop(self):
        Session = sessionmaker(bind=server_engine)
        db = Session()
        try:
            hospital = db.query(HHHHH).filter_by(h1=self.b).first()
            if hospital:
                db.delete(hospital)
                db.commit()
        finally:
            db.close()

        if self.ws:
            self.ws.close()

    def receive_token(self):
        try:
            response = self.ws.recv()
            response_data = json.loads(response)
            if response_data.get("t") == "a":
                self.key = response_data["j"].get("t")
                self.key_ex = response_data["j"].get("t_ex")
                print(f"t r: {self.key}")
            else:
                print("Unexpected message type received.")
        except Exception as e:
            print(f"Error receiving token: {e}")
            self.ws.close()
            self.ws = None

    @task
    def websocket_task(self):
        if not self.ws:
            print("WebSocket connection not established.")
            # self.on_start()
            return

        msg = {
            "b": self.b,
            "t": self.token,
            "m": "1",
            "mt": "test_locust",
            "p": "00",
            "r": self.b,
            "d": "1",

        }

        start_time = time.time()

        try:
         
            self.ws.send(json.dumps(msg))
            response = self.ws.recv()
            latency = (time.time() - start_time) * 1000

            print(
                f"WebSocket Task Success - Time: {latency:.2f} ms, Response: {response}"
            )
            # print(f"Received: {response}")
            # print(f"Token used: {self.token}")

        except Exception as e:
            latency = (time.time() - start_time) * 1000 

            print(f"WebSocket Task Failure - Time: {latency:.2f} ms, Error: {e}")
            print(f"Error: {e}")
            self.on_start()

 

그냥 변수 이름 아무렇게나 바꿨고, 흐름만 보면 됨

on_start > task > on_stop

큰 문제 없이 되긴함, 아니 그냥 쓰면 안됨

jmeter 사용해보려고 보는중

 
댓글

티스토리 방명록

최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday