-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_multi_user.py
More file actions
169 lines (139 loc) · 5.25 KB
/
test_multi_user.py
File metadata and controls
169 lines (139 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
"""
Çoklu Kullanıcı Test Script'i
Kullanım: python3 test_multi_user.py
"""
import socketio
import requests
import json
import time
import threading
from datetime import datetime
# Sunucu bilgileri
SERVER_URL = "http://localhost:8080"
SOCKET_URL = "http://localhost:8080"
class TestUser:
def __init__(self, username, password):
self.username = username
self.password = password
self.token = None
self.sio = socketio.Client()
self.messages_received = []
self.setup_socket_events()
def setup_socket_events(self):
"""Socket event handler'larını ayarla"""
@self.sio.event
def connect():
print(f"✅ {self.username} bağlandı!")
if self.token:
self.sio.emit('join', {'token': self.token})
@self.sio.event
def disconnect():
print(f"❌ {self.username} bağlantısı kesildi!")
@self.sio.on('status')
def on_status(data):
print(f"📢 {self.username}: {data['message']}")
@self.sio.on('new_message')
def on_new_message(data):
timestamp = datetime.fromisoformat(data['timestamp'].replace('Z', '+00:00'))
time_str = timestamp.strftime('%H:%M')
print(f"[{time_str}] {data['username']}: {data['content']}")
self.messages_received.append(data)
@self.sio.on('user_joined')
def on_user_joined(data):
print(f"👋 {data['username']} sohbete katıldı!")
@self.sio.on('user_left')
def on_user_left(data):
print(f"👋 {data['username']} sohbetten ayrıldı!")
def register_and_login(self):
"""Kayıt ol ve giriş yap"""
try:
# Kayıt ol
response = requests.post(f"{SERVER_URL}/register",
json={'username': self.username, 'password': self.password})
if response.status_code != 201:
print(f"❌ {self.username} kayıt hatası")
return False
# Giriş yap
response = requests.post(f"{SERVER_URL}/login",
json={'username': self.username, 'password': self.password})
if response.status_code == 200:
data = response.json()
self.token = data['token']
print(f"✅ {self.username} giriş başarılı!")
return True
else:
print(f"❌ {self.username} giriş hatası")
return False
except requests.exceptions.ConnectionError:
print(f"❌ {self.username} sunucuya bağlanılamıyor!")
return False
def connect_to_chat(self):
"""Chat'e bağlan"""
try:
self.sio.connect(SOCKET_URL)
return True
except Exception as e:
print(f"❌ {self.username} WebSocket hatası: {e}")
return False
def send_message(self, content):
"""Mesaj gönder"""
if self.token and self.sio.connected:
self.sio.emit('message', {'token': self.token, 'content': content})
print(f"💬 {self.username}: {content}")
else:
print(f"❌ {self.username} bağlantı yok!")
def disconnect(self):
"""Bağlantıyı kes"""
if self.sio.connected:
self.sio.disconnect()
def run_user_test(user, messages, delay=2):
"""Kullanıcı testini çalıştır"""
if not user.register_and_login():
return
if not user.connect_to_chat():
return
time.sleep(delay) # Bağlantı için bekle
# Mesajları gönder
for i, message in enumerate(messages):
user.send_message(message)
time.sleep(1) # Mesajlar arası bekle
time.sleep(3) # Son mesajların gelmesini bekle
user.disconnect()
def main():
"""Ana test fonksiyonu"""
print("🧪 Çoklu Kullanıcı Test Başlıyor...")
print("=" * 50)
# Test kullanıcıları
users = [
TestUser("elif", "pass123"),
TestUser("sercan", "pass456"),
TestUser("lordi", "pass789")
]
# Her kullanıcının mesajları
user_messages = [
["Merhaba Sercan! ❤️", "Seni çok seviyorum!", "Bugün nasılsın canım?"],
["Selam Elif! 😊", "Ben de seni seviyorum!", "Hava çok güzel bugün"],
["Herkese selam! 😄", "Ben Lordi, şakacı bir çocuğum!", "Haha, ne güzel sohbet bu!"]
]
# Thread'ler oluştur
threads = []
for i, (user, messages) in enumerate(zip(users, user_messages)):
thread = threading.Thread(
target=run_user_test,
args=(user, messages, i * 2) # Her kullanıcı 2 saniye arayla başlasın
)
threads.append(thread)
# Thread'leri başlat
for thread in threads:
thread.start()
# Tüm thread'lerin bitmesini bekle
for thread in threads:
thread.join()
print("\n🎉 Test tamamlandı!")
print("=" * 50)
# Sonuçları göster
for user in users:
print(f"📊 {user.username}: {len(user.messages_received)} mesaj aldı")
if __name__ == "__main__":
main()