티스토리 뷰
반응형
먼저 설치
pip install locust, websockets
파일 작성(locustfile.py)
- 나는 인증을 헤더로 하게 해놔서 헤더 추가함
import time
import json
import asyncio
from locust import User, task, between
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP
from locust.exception import LocustError
import websockets
class WebSocketClient:
def __init__(self, url, headers):
self.url = url
self.headers = headers
self.connection = None
async def connect(self):
self.connection = await websockets.connect(self.url, extra_headers=self.headers)
async def send_message(self, message):
await self.connection.send(message)
async def receive_message(self):
return await self.connection.recv()
async def close(self):
if self.connection:
await self.connection.close()
class WebSocketUser(User):
wait_time = between(1, 3)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = None
async def on_start(self):
headers = {
"key": "jwt? or remove header",
}
self.client = WebSocketClient("wss://your-host", headers)
await self.client.connect()
async def on_stop(self):
await self.client.close()
@task
async def websocket_task(self):
try:
#여기에 작업
pass
except Exception as e:
print(f"Error during WebSocket communication: {e}")
def run(self):
asyncio.run(self.start_test())
async def start_test(self):
try:
await self.on_start()
while self.environment.runner.state not in (
STATE_STOPPING,
STATE_STOPPED,
STATE_CLEANUP,
):
await self.websocket_task()
await asyncio.sleep(self.wait_time())
except LocustError as e:
print(f"Locust error: {e}")
finally:
await self.on_stop()
locust 서버 실행
locust -f locustfile.py
localhost:8089
일단 간단하게 연결 되는지 확인했고, 추가로 작성 예정임
테스트 코드
# locustfile.py
import sys
import os
import json
import time
import random
import string
from datetime import datetime
from locust import User, task, between
import websocket
from sqlalchemy.orm import sessionmaker
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "ws_server"))
)
# 필요한 모듈 가져오기
from common.model_server import HHHHH
from common.utils import generate_credentials
from common.db_init import server_engine
class WebSocketUser(User):
wait_time = between(1, 5)
uri = "ws://127.0.0.1:6410/ws"
key = ""
key_ex = ""
def create_hospital(self):
Session = sessionmaker(bind=server_engine)
db = Session()
try:
self.a, self.s = generate_credentials()
self.b = "".join(random.choices(string.digits, k=10))
hospital = HHHHH(
name="locust",
h1=self.b,
key=self.a,
key2=self.s,
)
db.add(hospital)
db.commit()
finally:
db.close()
def on_start(self):
self.create_hospital()
headers = {
"key": f"{self.a}",
"key2": f"{self.s}",
"h1": f"{self.b}",
}
header_list = [f"{k}: {v}" for k, v in headers.items()]
websocket.enableTrace(False) # 필요에 따라 True로 설정하여 디버깅 정보 출력
try:
self.ws = websocket.WebSocket()
self.ws.connect(self.uri, header=header_list, timeout=10)
self.receive_token()
except Exception as e:
print(f"Error during connection: {e}")
self.ws = None
def on_stop(self):
Session = sessionmaker(bind=server_engine)
db = Session()
try:
hospital = db.query(HHHHH).filter_by(h1=self.b).first()
if hospital:
db.delete(hospital)
db.commit()
finally:
db.close()
if self.ws:
self.ws.close()
def receive_token(self):
try:
response = self.ws.recv()
response_data = json.loads(response)
if response_data.get("t") == "a":
self.key = response_data["j"].get("t")
self.key_ex = response_data["j"].get("t_ex")
print(f"t r: {self.key}")
else:
print("Unexpected message type received.")
except Exception as e:
print(f"Error receiving token: {e}")
self.ws.close()
self.ws = None
@task
def websocket_task(self):
if not self.ws:
print("WebSocket connection not established.")
# self.on_start()
return
msg = {
"b": self.b,
"t": self.token,
"m": "1",
"mt": "test_locust",
"p": "00",
"r": self.b,
"d": "1",
}
start_time = time.time()
try:
self.ws.send(json.dumps(msg))
response = self.ws.recv()
latency = (time.time() - start_time) * 1000
print(
f"WebSocket Task Success - Time: {latency:.2f} ms, Response: {response}"
)
# print(f"Received: {response}")
# print(f"Token used: {self.token}")
except Exception as e:
latency = (time.time() - start_time) * 1000
print(f"WebSocket Task Failure - Time: {latency:.2f} ms, Error: {e}")
print(f"Error: {e}")
self.on_start()
그냥 변수 이름 아무렇게나 바꿨고, 흐름만 보면 됨
on_start > task > on_stop
큰 문제 없이 되긴함, 아니 그냥 쓰면 안됨
jmeter 사용해보려고 보는중
'Python' 카테고리의 다른 글
JMeter WebSocket 부하 테스트(fastapi, nginx) (3) | 2024.11.14 |
---|---|
pymssql 연결 안 됨 (0) | 2024.09.02 |
[Python] onedrive file upload example(파이썬 원 드라이브 파일 업로드 예제) (0) | 2024.02.08 |
python class 생성 시 사용한 변수의 값을 내부에서 참조함 (0) | 2023.12.02 |
ModuleNotFoundError: No module named 'MySQLdb' (0) | 2023.08.09 |
댓글
티스토리 방명록
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
Blog is powered by
Tistory / Designed by
Tistory
Contact: j0n9m1n1@gmail.com
Contact: j0n9m1n1@gmail.com