-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathweb.py
More file actions
292 lines (245 loc) · 9.93 KB
/
web.py
File metadata and controls
292 lines (245 loc) · 9.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
from flask import Flask, request, jsonify, render_template, Response
from flask_socketio import SocketIO, emit
from werkzeug.utils import secure_filename
import os
import torch
import pandas as pd
import cv2
import numpy as np
import base64
from kernel_utils import VideoReader, FaceExtractor, confident_strategy, predict_on_video_set
from training.zoo.classifiers import DeepFakeClassifier
import socket
import threading
import queue
import time
import subprocess
import sys
import platform
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size
app.config['SECRET_KEY'] = 'your-secret-key'
# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# Initialize model
def load_model():
model = DeepFakeClassifier(encoder="tf_efficientnet_b7_ns").to("cuda")
checkpoint = torch.load("weights/final_999_DeepFakeClassifier_tf_efficientnet_b7_ns_0_23", map_location="cpu")
state_dict = checkpoint.get("state_dict", checkpoint)
model.load_state_dict({k.replace("module.", ""): v for k, v in state_dict.items()}, strict=True)
model.eval()
return model.half()
# Global variables for model and utilities
model = load_model()
frames_per_video = 32
video_reader = VideoReader()
video_read_fn = lambda x: video_reader.read_frames(x, num_frames=frames_per_video)
face_extractor = FaceExtractor(video_read_fn)
input_size = 380
strategy = confident_strategy
# Queue for real-time processing
frame_queue = queue.Queue(maxsize=32)
result_queue = queue.Queue()
# Store the Electron process
electron_process = None
def get_local_ip():
try:
# Create a socket to get the local IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "127.0.0.1"
def process_frame(frame):
# Convert frame to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Extract faces
faces = face_extractor.process_video(frame_rgb)
if len(faces) > 0:
# Process the first face found
face = faces[0]["faces"][0] if faces[0]["faces"] else None
if face is not None:
# Resize face to model input size
face = cv2.resize(face, (input_size, input_size))
face = cv2.cvtColor(face, cv2.COLOR_RGB2BGR)
# Convert to tensor and normalize
face_tensor = torch.from_numpy(face).float().permute(2, 0, 1).unsqueeze(0)
face_tensor = face_tensor / 255.0
face_tensor = face_tensor.to("cuda")
# Get prediction
with torch.no_grad():
pred = model(face_tensor.half())
pred = torch.sigmoid(pred).cpu().numpy()[0][0]
return pred
return 0.5
def real_time_processor():
while True:
if not frame_queue.empty():
frame = frame_queue.get()
result = process_frame(frame)
result_queue.put(result)
time.sleep(0.01) # Small delay to prevent CPU overload
# Start the real-time processor thread
processor_thread = threading.Thread(target=real_time_processor, daemon=True)
processor_thread.start()
def start_electron_app():
global electron_process
try:
# Get the directory of the current script
current_dir = os.path.dirname(os.path.abspath(__file__))
# Find npm in common locations
npm_paths = [
os.path.join(os.environ.get('APPDATA', ''), 'npm', 'npm.cmd'), # Windows npm global
os.path.join(os.environ.get('PROGRAMFILES', ''), 'nodejs', 'npm.cmd'), # Windows npm in Program Files
os.path.join(os.environ.get('PROGRAMFILES(X86)', ''), 'nodejs', 'npm.cmd'), # Windows npm in Program Files (x86)
'npm' # Try system PATH
]
npm_path = None
for path in npm_paths:
if os.path.exists(path):
npm_path = path
break
if not npm_path:
raise Exception("Could not find npm. Please make sure Node.js is installed.")
# Check if package.json exists
package_json = os.path.join(current_dir, 'package.json')
if not os.path.exists(package_json):
raise Exception("package.json not found. Please make sure you're in the correct directory.")
# Determine the command based on the platform
if platform.system() == 'Windows':
# For Windows, use npm start
electron_process = subprocess.Popen(
[npm_path, 'start'],
cwd=current_dir,
creationflags=subprocess.CREATE_NEW_CONSOLE,
shell=True
)
else:
# For Unix-like systems
electron_process = subprocess.Popen(
[npm_path, 'start'],
cwd=current_dir
)
# Wait a bit to check if the process started successfully
time.sleep(2)
if electron_process.poll() is not None:
raise Exception("Electron app failed to start. Check if all dependencies are installed.")
return True
except Exception as e:
print(f"Error starting Electron app: {e}")
return False
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
@socketio.on('screen_frame')
def handle_screen_frame(data):
try:
# Decode base64 image
encoded_data = data.split(',')[1]
nparr = np.frombuffer(base64.b64decode(encoded_data), np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
print("Error: Could not decode frame")
return
# Convert frame to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Create a temporary file to store the frame
temp_frame_path = os.path.join(app.config['UPLOAD_FOLDER'], 'temp_frame.jpg')
cv2.imwrite(temp_frame_path, frame)
try:
# Extract faces using the temporary file
faces = face_extractor.process_video(temp_frame_path)
if len(faces) > 0:
# Process the first face found
face = faces[0]["faces"][0] if faces[0]["faces"] else None
if face is not None:
# Resize face to model input size
face = cv2.resize(face, (input_size, input_size))
face = cv2.cvtColor(face, cv2.COLOR_RGB2BGR)
# Convert to tensor and normalize
face_tensor = torch.from_numpy(face).float().permute(2, 0, 1).unsqueeze(0)
face_tensor = face_tensor / 255.0
face_tensor = face_tensor.to("cuda")
# Get prediction
with torch.no_grad():
pred = model(face_tensor.half())
pred = torch.sigmoid(pred).cpu().numpy()[0][0]
# Send result back to client
label = "Fake" if pred > 0.5 else "Real"
confidence = pred if pred > 0.5 else 1 - pred
emit('detection_result', {
'label': label,
'confidence': float(confidence)
})
else:
emit('detection_result', {
'label': 'No Face',
'confidence': 0.0
})
else:
emit('detection_result', {
'label': 'No Face',
'confidence': 0.0
})
finally:
# Clean up temporary file
if os.path.exists(temp_frame_path):
os.remove(temp_frame_path)
except Exception as e:
print(f"Error processing frame: {str(e)}")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/screen')
def screen():
return render_template('screen.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'video' not in request.files:
return jsonify({'error': 'No video file provided'}), 400
file = request.files['video']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file:
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# Process video
predictions = predict_on_video_set(
face_extractor=face_extractor,
input_size=input_size,
models=[model],
strategy=strategy,
frames_per_video=frames_per_video,
videos=[filename],
num_workers=1,
test_dir=app.config['UPLOAD_FOLDER']
)
# Clean up
os.remove(filepath)
# Return prediction (0 for real, 1 for fake)
result = "Fake" if predictions[0] > 0.5 else "Real"
confidence = predictions[0] if predictions[0] > 0.5 else 1 - predictions[0]
return jsonify({
'result': result,
'confidence': float(confidence),
'filename': filename
})
@app.route('/start-electron', methods=['POST'])
def start_electron():
success = start_electron_app()
return jsonify({'success': success})
if __name__ == '__main__':
local_ip = get_local_ip()
print(f"\n=== Deepfake Detection Server ===")
print(f"Local URL: http://localhost:5000")
print(f"Network URL: http://{local_ip}:5000")
print("===============================\n")
socketio.run(app, host='0.0.0.0', port=5000, debug=True)