forked from YonkoSam/whatsapp-python-chatbot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
322 lines (278 loc) · 15.1 KB
/
script.py
File metadata and controls
322 lines (278 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import os
import logging
import requests
from flask import Flask, request, jsonify
from dotenv import load_dotenv
import google.generativeai as genai
import json
load_dotenv()
app = Flask(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Directory for storing conversations
CONVERSATIONS_DIR = 'conversations'
if not os.path.exists(CONVERSATIONS_DIR):
os.makedirs(CONVERSATIONS_DIR)
logging.info(f"Created conversations directory at {CONVERSATIONS_DIR}")
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
WASENDER_API_TOKEN = os.getenv('WASENDER_API_TOKEN')
WASENDER_API_URL = "https://wasenderapi.com/api/send-message"
if GEMINI_API_KEY:
genai.configure(api_key=GEMINI_API_KEY)
else:
logging.error("GEMINI_API_KEY not found in environment variables. The application might not work correctly.")
@app.errorhandler(Exception)
def handle_global_exception(e):
"""Global handler for unhandled exceptions."""
logging.error(f"Unhandled Exception: {e}", exc_info=True)
return jsonify(status='error', message='An internal server error occurred.'), 500
# --- Load Persona ---
PERSONA_FILE_PATH = 'persona.json'
PERSONA_DESCRIPTION = "You are a helpful assistant." # Default persona
PERSONA_NAME = "Assistant"
BASE_PROMPT = "You are a helpful and concise AI assistant replying in a WhatsApp chat. Do not use Markdown formatting. Keep your answers short, friendly, and easy to read. If your response is longer than 3 lines, split it into multiple messages using \n every 3 lines. Each \n means a new WhatsApp message. Avoid long paragraphs or unnecessary explanations."
try:
with open(PERSONA_FILE_PATH, 'r') as f:
persona_data = json.load(f)
custom_description = persona_data.get('description', PERSONA_DESCRIPTION)
base_prompt = persona_data.get('base_prompt', BASE_PROMPT)
PERSONA_DESCRIPTION = f"{base_prompt}\n\n{custom_description}"
PERSONA_NAME = persona_data.get('name', PERSONA_NAME)
logging.info(f"Successfully loaded persona: {PERSONA_NAME}")
except FileNotFoundError:
logging.warning(f"Persona file not found at {PERSONA_FILE_PATH}. Using default persona.")
except json.JSONDecodeError:
logging.error(f"Error decoding JSON from {PERSONA_FILE_PATH}. Using default persona.")
except Exception as e:
logging.error(f"An unexpected error occurred while loading persona: {e}. Using default persona.")
# --- End Load Persona ---
def load_conversation_history(user_id):
"""Loads conversation history for a given user_id."""
file_path = os.path.join(CONVERSATIONS_DIR, f"{user_id}.json")
try:
with open(file_path, 'r') as f:
history = json.load(f)
# Ensure history is a list of dictionaries (pairs of user/assistant messages)
if isinstance(history, list) and all(isinstance(item, dict) and 'role' in item and 'parts' in item for item in history):
return history
else:
logging.warning(f"Invalid history format in {file_path}. Starting fresh.")
return []
except FileNotFoundError:
return []
except json.JSONDecodeError:
logging.error(f"Error decoding JSON from {file_path}. Starting fresh.")
return []
except Exception as e:
logging.error(f"Unexpected error loading history from {file_path}: {e}")
return []
def save_conversation_history(user_id, history):
"""Saves conversation history for a given user_id."""
file_path = os.path.join(CONVERSATIONS_DIR, f"{user_id}.json")
try:
with open(file_path, 'w') as f:
json.dump(history, f, indent=2)
except Exception as e:
logging.error(f"Error saving conversation history to {file_path}: {e}")
def split_message(text, max_lines=3, max_chars_per_line=100):
"""Split a long message into smaller chunks for better WhatsApp readability."""
# First split by existing newlines
paragraphs = text.split('\\n')
chunks = []
current_chunk = []
current_line_count = 0
for paragraph in paragraphs:
# Split long paragraphs into smaller lines
if len(paragraph) > max_chars_per_line:
words = paragraph.split()
current_line = []
current_length = 0
for word in words:
if current_length + len(word) + 1 <= max_chars_per_line:
current_line.append(word)
current_length += len(word) + 1
else:
if current_line_count >= max_lines:
chunks.append('\n'.join(current_chunk))
current_chunk = []
current_line_count = 0
current_chunk.append(' '.join(current_line))
current_line_count += 1
current_line = [word]
current_length = len(word)
if current_line:
if current_line_count >= max_lines:
chunks.append('\n'.join(current_chunk))
current_chunk = []
current_line_count = 0
current_chunk.append(' '.join(current_line))
current_line_count += 1
else:
if current_line_count >= max_lines:
chunks.append('\n'.join(current_chunk))
current_chunk = []
current_line_count = 0
current_chunk.append(paragraph)
current_line_count += 1
if current_chunk:
chunks.append('\n'.join(current_chunk))
return chunks
def get_gemini_response(message_text, conversation_history=None):
"""Generates a response from Gemini using the google-generativeai library, including conversation history."""
if not GEMINI_API_KEY:
logging.error("Gemini API key is not configured.")
return "Sorry, I'm having trouble connecting to my brain right now (API key issue)."
try:
# Using Gemini 2.0 Flash model with system instruction for persona
model_name = 'gemini-2.0-flash'
model = genai.GenerativeModel(model_name, system_instruction=PERSONA_DESCRIPTION)
logging.info(f"Sending prompt to Gemini (system persona active): {message_text[:200]}...")
if conversation_history:
# Use chat history if available
chat = model.start_chat(history=conversation_history)
response = chat.send_message(message_text)
else:
# For first message with no history
response = model.generate_content(message_text)
# Extract the text from the response
if response and hasattr(response, 'text') and response.text:
return response.text.strip()
elif response and response.candidates:
# Fallback if .text is not directly available but candidates are
try:
return response.candidates[0].content.parts[0].text.strip()
except (IndexError, AttributeError, KeyError) as e:
logging.error(f"Error parsing Gemini response candidates: {e}. Response: {response}")
return "I received an unusual response structure from Gemini. Please try again."
else:
logging.error(f"Gemini API (google-generativeai) returned an empty or unexpected response: {response}")
return "I received an empty or unexpected response from Gemini. Please try again."
except Exception as e:
logging.error(f"Error calling Gemini API with google-generativeai: {e}", exc_info=True)
return "I'm having trouble processing that request with my AI brain. Please try again later."
def send_whatsapp_message(recipient_number, message_content, message_type='text', media_url=None):
"""Sends a message via WaSenderAPI. Supports text and media messages."""
if not WASENDER_API_TOKEN:
logging.error("WaSender API token is not set. Please check .env file.")
return False
headers = {
'Authorization': f'Bearer {WASENDER_API_TOKEN}',
'Content-Type': 'application/json'
}
# Sanitize recipient_number to remove "@s.whatsapp.net"
if recipient_number and "@s.whatsapp.net" in recipient_number:
formatted_recipient_number = recipient_number.split('@')[0]
else:
formatted_recipient_number = recipient_number
payload = {
'to': formatted_recipient_number
}
if message_type == 'text':
payload['text'] = message_content
elif message_type == 'image' and media_url:
payload['imageUrl'] = media_url
if message_content:
payload['text'] = message_content
elif message_type == 'video' and media_url:
payload['videoUrl'] = media_url
if message_content:
payload['text'] = message_content
elif message_type == 'audio' and media_url:
payload['audioUrl'] = media_url
elif message_type == 'document' and media_url:
payload['documentUrl'] = media_url
if message_content:
payload['text'] = message_content
else:
if message_type != 'text':
logging.error(f"Media URL is required for message type '{message_type}'.")
return False
logging.error(f"Unsupported message type or missing content/media_url: {message_type}")
return False
logging.debug(f"Attempting to send WhatsApp message. Payload: {payload}")
try:
response = requests.post(WASENDER_API_URL, headers=headers, json=payload, timeout=20)
response.raise_for_status()
logging.info(f"Message sent to {recipient_number}. Response: {response.json()}")
return True
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response is not None else "N/A"
response_text = e.response.text if e.response is not None else "N/A"
logging.error(f"Error sending WhatsApp message to {recipient_number} (Status: {status_code}): {e}. Response: {response_text}")
if status_code == 422:
logging.error("WaSenderAPI 422 Error: This often means an issue with the payload (e.g., device_id, 'to' format, or message content/URL). Check the payload logged above and WaSenderAPI docs.")
return False
except Exception as e:
logging.error(f"An unexpected error occurred while sending WhatsApp message: {e}")
return False
@app.route('/webhook', methods=['POST'])
def webhook():
"""Handles incoming WhatsApp messages via webhook."""
data = request.json
logging.info(f"Received webhook data (first 200 chars): {str(data)[:200]}")
try:
if data.get('event') == 'messages.upsert' and data.get('data') and data['data'].get('messages'):
message_info = data['data']['messages']
# Check if it's a message sent by the bot itself
if message_info.get('key', {}).get('fromMe'):
logging.info(f"Ignoring self-sent message: {message_info.get('key', {}).get('id')}")
return jsonify({'status': 'success', 'message': 'Self-sent message ignored'}), 200
sender_number = message_info.get('key', {}).get('remoteJid')
incoming_message_text = None
message_type = 'unknown'
# Extract message content based on message structure
if message_info.get('message'):
msg_content_obj = message_info['message']
if 'conversation' in msg_content_obj:
incoming_message_text = msg_content_obj['conversation']
message_type = 'text'
elif 'extendedTextMessage' in msg_content_obj and 'text' in msg_content_obj['extendedTextMessage']:
incoming_message_text = msg_content_obj['extendedTextMessage']['text']
message_type = 'text'
if message_info.get('messageStubType'):
stub_params = message_info.get('messageStubParameters', [])
logging.info(f"Received system message of type {message_info['messageStubType']} from {sender_number}. Stub params: {stub_params}")
return jsonify({'status': 'success', 'message': 'System message processed'}), 200
if not sender_number:
logging.warning("Webhook received message without sender information.")
return jsonify({'status': 'error', 'message': 'Incomplete sender data'}), 400
# Sanitize sender_number to use as a filename
safe_sender_id = "".join(c if c.isalnum() else '_' for c in sender_number)
if message_type == 'text' and incoming_message_text:
logging.info(f"Processing text message from {sender_number} ({safe_sender_id}): {incoming_message_text}")
# Load conversation history
conversation_history = load_conversation_history(safe_sender_id)
# Get Gemini's reply, passing the history
gemini_reply = get_gemini_response(incoming_message_text, conversation_history)
if gemini_reply:
# Split the response into chunks and send them sequentially
message_chunks = split_message(gemini_reply)
for chunk in message_chunks:
if not send_whatsapp_message(sender_number, chunk, message_type='text'):
logging.error(f"Failed to send message chunk to {sender_number}")
break
# Delay between messages
import random
import time
if i < len(message_chunks) - 1:
delay = random.uniform(0.55, 1.5)
time.sleep(delay)
# Save the new exchange to history
# Ensure history format is compatible with genai: list of {'role': 'user'/'model', 'parts': ['text']}
conversation_history.append({'role': 'user', 'parts': [incoming_message_text]})
conversation_history.append({'role': 'model', 'parts': [gemini_reply]})
save_conversation_history(safe_sender_id, conversation_history)
elif incoming_message_text:
logging.info(f"Received '{message_type}' message from {sender_number}. No text content. Full data: {message_info}")
elif message_type != 'unknown':
logging.info(f"Received '{message_type}' message from {sender_number}. No text content. Full data: {message_info}")
else:
logging.warning(f"Received unhandled or incomplete message from {sender_number}. Data: {message_info}")
elif data.get('event'):
logging.info(f"Received event '{data.get('event')}' which is not 'messages.upsert'. Data: {str(data)[:200]}")
return jsonify({'status': 'success'}), 200
except Exception as e:
logging.error(f"Error processing webhook: {e}")
return jsonify({'status': 'error', 'message': 'Internal server error'}), 500
if __name__ == '__main__':
# For development with webhook testing via ngrok
app.run(debug=True, port=5001, host='0.0.0.0')