# game_server.py
from flask import Flask, send_from_directory, jsonify, request, redirect, session, make_response
from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect
from pymongo import MongoClient
from bson.objectid import ObjectId
import random
import time
import math
from pydub import AudioSegment
from pydub.utils import make_chunks
import threading
import cmd
import numpy as np
import io
import os
import ssl
from werkzeug.utils import secure_filename
from threading import Thread
import bcrypt
import jwt
from functools import wraps
import datetime
import re
from bson.json_util import dumps
from bson import json_util
from bson import ObjectId
import json
import logging
import requests
from urllib.parse import urlencode
import secrets
from werkzeug.http import generate_etag
from config import DISCORD_CLIENT_ID, DISCORD_CLIENT_SECRET, DISCORD_REDIRECT_URI, DISCORD_BOT_TOKEN, DISCORD_GUILD_ID, ACCOUNT_LINKED_ROLE_ID, DISCORD_WEBHOOK_URL
secret_key = secrets.token_hex(16)
DISCORD_API_ENDPOINT = 'https://discord.com/api/v10'
DISCORD_OAUTH_STATE_EXPIRY = 600 # 10 minutes
app = Flask(__name__)
app.config['SECRET_KEY'] = secret_key
socketio = SocketIO(app, cors_allowed_origins="*")
# MongoDB setup
client = MongoClient('mongodb://localhost:27017/')
db = client['resonance_rumble']
users_collection = db['users']
sessions_collection = db['sessions']
skins_collection = db['skins']
weapons_collection = db['weapons']
classes_collection = db['classes']
enemies_collection = db['enemies']
active_sockets_collection = db['active_sockets']
oauth_states = db['oauth_states']
def decode_token(token):
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['user_id']
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token is missing'}), 401
user_id = decode_token(token)
if not user_id:
return jsonify({'error': 'Token is invalid or expired'}), 401
return f(user_id, *args, **kwargs)
return decorated
# Game state
main_room = 'main_game_room'
MAP_WIDTH = 3000
MAP_HEIGHT = 2000
def send_discord_alert(message):
data = {
"content": message
}
response = requests.post(DISCORD_WEBHOOK_URL, json=data)
if response.status_code != 204:
logging.error(f"Failed to send Discord alert: {response.status_code}, {response.text}")
def send_alert(message):
socketio.emit('game_alert', {'message': message}, room=main_room)
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
elif isinstance(obj, Player):
return obj.to_dict()
elif isinstance(obj, SynthCoin):
return {'x': obj.x, 'y': obj.y, 'radius': obj.radius}
return super().default(obj)
app.json_encoder = CustomJSONEncoder
def serialize_weapon(weapon):
serialized = {k: str(v) if isinstance(v, ObjectId) else v for k, v in weapon.items()}
return serialized
def serialize_game_state(game_state):
serialized_state = {
'players': {
player_id: player.to_dict() for player_id, player in game_state['players'].items()
},
'enemies': game_state['enemies'],
'bullets': game_state['bullets'],
'synth_coins': [{'x': coin.x, 'y': coin.y, 'radius': coin.radius} for coin in game_state['synth_coins']],
# Add other necessary game state data here
}
return serialized_state
# New function to load music from the 'music' folder
def load_music_playlist():
music_folder = 'music'
playlist = []
for file in os.listdir(music_folder):
if file.endswith('.mp3'):
playlist.append(os.path.join(music_folder, file))
return playlist
class SynthCoin:
def __init__(self, x, y):
self.x = x
self.y = y
self.radius = 5
self.follow_radius = 100
self.follow_speed = 2
self.created_at = time.time()
def update(self, players):
closest_player = None
closest_distance = float('inf')
for player in players.values():
if player.is_paused:
continue
distance = math.hypot(player.x - self.x, player.y - self.y)
if distance < self.follow_radius and distance < closest_distance:
closest_player = player
closest_distance = distance
if closest_player:
dx = closest_player.x - self.x
dy = closest_player.y - self.y
distance = math.hypot(dx, dy)
if distance > 0:
self.x += (dx / distance) * self.follow_speed
self.y += (dy / distance) * self.follow_speed
game_state = {
'players': {},
'enemies': [],
'bullets': [],
'start_time': time.time(),
'music_start_time': None,
'current_song': None,
'playlist': load_music_playlist(), # Load the playlist dynamically
'song_position': 0,
'is_playing': False,
'synth_coins': [],
'player_upgrade_options': {}
}
MAX_ENEMIES = 10
ENEMY_SPEED = 1
BULLET_SPEED = 5
BULLET_LIFETIME = 3 # seconds
CHUNK_SIZE = 1024 # Size of each audio chunk to send
active_players = set()
def generate_enemies(count):
enemy_templates = list(enemies_collection.find())
if not enemy_templates:
print("No enemy templates found in the database")
return []
enemies = []
for _ in range(count):
template = random.choice(enemy_templates)
enemy = {
'id': str(ObjectId()),
'name': template['name'],
'x': random.randint(0, 1000),
'y': random.randint(0, 1000),
'image': template['image'],
'attributes': template['attributes'].copy(),
'difficulty': template['difficulty'],
'current_health': template['attributes']['health'],
'experience': int(template['attributes'].get('experience', 10)) # Default to 10 if not set
}
enemies.append(enemy)
return enemies
def is_valid_input(input_string):
if not isinstance(input_string, str):
return False
# Check for MongoDB operators and other potentially harmful patterns
invalid_patterns = ['\$', '__proto__', 'constructor']
return not any(pattern in input_string for pattern in invalid_patterns)
@app.before_request
def before_request():
if not request.is_secure:
url = request.url.replace('http://', 'https://', 1)
return redirect(url, code=301)
@app.route('/')
def index():
return send_from_directory('.', 'index.html')
@app.route('/initiate-discord-link', methods=['POST'])
@token_required
def initiate_discord_link(user_id):
state = secrets.token_urlsafe(32)
oauth_states.insert_one({
'state': state,
'user_id': user_id,
'expires_at': datetime.datetime.utcnow() + datetime.timedelta(seconds=DISCORD_OAUTH_STATE_EXPIRY)
})
params = {
'client_id': DISCORD_CLIENT_ID,
'redirect_uri': DISCORD_REDIRECT_URI,
'response_type': 'code',
'scope': 'identify',
'state': state
}
auth_url = f'{DISCORD_API_ENDPOINT}/oauth2/authorize?{urlencode(params)}'
return jsonify({'auth_url': auth_url})
@app.route('/discord-callback')
def discord_callback():
code = request.args.get('code')
state = request.args.get('state')
if not code or not state:
return "Error: Missing parameters", 400
# Verify state and get user_id
oauth_state = oauth_states.find_one_and_delete({
'state': state,
'expires_at': {'$gt': datetime.datetime.utcnow()}
})
if not oauth_state:
return "Error: Invalid or expired state", 400
user_id = oauth_state['user_id']
# Exchange code for token
data = {
'client_id': DISCORD_CLIENT_ID,
'client_secret': DISCORD_CLIENT_SECRET,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': DISCORD_REDIRECT_URI
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
r = requests.post(f'{DISCORD_API_ENDPOINT}/oauth2/token', data=data, headers=headers)
if r.status_code != 200:
return "Error: Failed to exchange code for token", 400
tokens = r.json()
# Get user info
headers = {
'Authorization': f'Bearer {tokens["access_token"]}'
}
r = requests.get(f'{DISCORD_API_ENDPOINT}/users/@me', headers=headers)
if r.status_code != 200:
return "Error: Failed to get user info", 400
user_data = r.json()
# Update user in database
users_collection.update_one(
{'_id': ObjectId(user_id)},
{'$set': {
'discord_id': user_data['id'],
'discord_username': f"{user_data['username']}#{user_data['discriminator']}",
'discord_avatar': user_data['avatar']
}}
)
# After successfully updating the user in the database:
add_discord_role(user_data['id'], ACCOUNT_LINKED_ROLE_ID)
return """
"""
@app.route('/unlink-discord', methods=['POST'])
@token_required
def unlink_discord(user_id):
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({'success': False, 'message': 'User not found'}), 404
discord_id = user.get('discord_id')
if discord_id:
remove_discord_role(discord_id, ACCOUNT_LINKED_ROLE_ID)
result = users_collection.update_one(
{'_id': ObjectId(user_id)},
{'$unset': {
'discord_id': '',
'discord_username': '',
'discord_avatar': ''
}}
)
if result.modified_count > 0:
return jsonify({'success': True, 'message': 'Discord account unlinked successfully'})
else:
return jsonify({'success': False, 'message': 'Failed to unlink Discord account'}), 500
@app.route('/get-discord-info')
@token_required
def get_discord_info(user_id):
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({
'linked': 'discord_id' in user,
'username': user.get('discord_username'),
'avatar': user.get('discord_avatar'),
'discord_id': user.get('discord_id')
})
@app.route('/.well-known/acme-challenge/')
def acme_challenge(filename):
return send_from_directory(os.path.join(app.root_path, '.well-known', 'acme-challenge'),
filename)
@app.route('/get-enemy-images')
def get_enemy_images():
enemy_images = list(enemies_collection.distinct('image'))
return jsonify(enemy_images)
@app.route('/')
def serve_static(filename):
# Ensure the filename is secure
filename = secure_filename(filename)
# Check if the file has an allowed extension
allowed_extensions = ['.mp3', '.js', '.png', '.ico', '.webmanifest', '.css', '.gif']
if not any(filename.lower().endswith(ext) for ext in allowed_extensions):
return "Access denied", 403
# Determine the appropriate directory based on file extension and path
if filename.lower().endswith('.mp3'):
file_directory = 'music'
elif filename.lower().endswith('.js'):
file_directory = 'static/js'
elif filename.lower().startswith('hats/'):
file_directory = 'static' # Hats are in static/hats
elif filename.lower().startswith('images/'):
file_directory = 'static' # Images are in static/images
else:
file_directory = 'static' # For favicon files and webmanifest
# Construct the full path
file_path = os.path.join(file_directory, filename)
# Check if the file exists and is within the intended directory
if not os.path.exists(file_path) or not os.path.abspath(file_path).startswith(os.path.abspath(file_directory)):
return "File not found", 404
return send_from_directory(file_directory, filename)
@app.route('/get-shop-items', methods=['POST'])
def get_shop_items():
data = request.json
player_class_id = data.get('player_class')
if not player_class_id:
return jsonify({'error': 'Player class not provided'}), 400
player_class = classes_collection.find_one({'_id': ObjectId(player_class_id)})
if not player_class:
return jsonify({'error': 'Invalid player class'}), 400
allowed_types = player_class['weapon_restrictions']['allowed_types']
disallowed_types = player_class['weapon_restrictions']['disallowed_types']
weapons = list(weapons_collection.find({
'weapon_type': {'$in': allowed_types, '$nin': disallowed_types}
}))
for weapon in weapons:
weapon['_id'] = str(weapon['_id'])
return jsonify(weapons), 200
# Add this new route to handle weapon purchase
@app.route('/buy-weapon', methods=['POST'])
def buy_weapon():
data = request.json
player_id = data.get('player_id')
weapon_id = data.get('weapon_id')
if not player_id or not weapon_id:
return jsonify({'success': False, 'message': 'Invalid request'}), 400
player = game_state['players'].get(player_id)
if not player:
return jsonify({'success': False, 'message': 'Player not found'}), 404
weapon = weapons_collection.find_one({'_id': ObjectId(weapon_id)})
if not weapon:
return jsonify({'success': False, 'message': 'Weapon not found'}), 404
weapon_price = int(weapon.get('price', 0))
if player.synth_coins < weapon_price:
return jsonify({'success': False, 'message': 'Not enough Synth Coins'}), 400
if len(player.weapons) >= player.weapon_limit:
return jsonify({'success': False, 'message': 'Weapon limit reached'}), 400
player.synth_coins -= weapon_price
player.add_weapon(weapon)
# Notify all clients about the updated player data
socketio.emit('player_update', {
'id': player_id,
'player': player.to_dict()
}, room=main_room)
return jsonify({
'success': True,
'message': 'Weapon purchased successfully',
'updatedPlayerData': {
'synth_coins': player.synth_coins,
'weapons': [serialize_weapon(w) for w in player.weapons]
}
}), 200
@app.route('/get-available-classes', methods=['GET'])
def get_available_classes():
classes = list(classes_collection.find())
for class_data in classes:
class_data['_id'] = str(class_data['_id'])
class_data['starting_weapon'] = weapons_collection.find_one({'_id': class_data['starting_weapon']})
class_data['starting_weapon']['_id'] = str(class_data['starting_weapon']['_id'])
return jsonify(classes)
@app.route('/select-class', methods=['POST'])
def select_class():
data = request.json
username = data.get('username')
class_id = data.get('class_id')
if not class_id:
return jsonify({'error': 'Missing class_id'}), 400
player_class = classes_collection.find_one({'_id': ObjectId(class_id)})
if not player_class:
return jsonify({'error': 'Class not found'}), 404
if not player_class['unlocked_by_default']:
if not username:
return jsonify({'error': 'Username required for locked classes'}), 403
user = users_collection.find_one({'username': username})
if not user:
return jsonify({'error': 'User not found'}), 404
# Check if the user has met the unlock requirements
# This is a placeholder and should be implemented based on your game's logic
return jsonify({'error': 'Class is locked'}), 403
if username:
users_collection.update_one(
{'username': username},
{'$set': {'selected_class': ObjectId(class_id)}}
)
return jsonify({'success': True, 'message': 'Class selected successfully'}), 200
@app.route('/get-available-skins', methods=['GET'])
def get_available_skins():
# Retrieve all skins from the database
all_skins = list(skins_collection.find())
# Convert ObjectId to string for JSON serialization
for skin in all_skins:
skin['_id'] = str(skin['_id'])
return jsonify(all_skins), 200
@app.route('/get-unlocked-skins', methods=['POST'])
def get_unlocked_skins():
data = request.json
username = data.get('username')
if not username:
return jsonify({'error': 'Username is required'}), 400
user = users_collection.find_one({'username': username})
if not user:
return jsonify({'error': 'User not found'}), 404
unlocked_skins = user.get('unlocked_skins', [])
last_modified = user.get('skins_last_modified', datetime.datetime.utcnow())
# Fetch the details of each unlocked skin
unlocked_skin_details = []
for skin_id in unlocked_skins:
skin = skins_collection.find_one({'_id': skin_id})
if skin:
unlocked_skin_details.append({
'id': str(skin['_id']),
'type': skin['type'],
'name': skin['name'],
'value': skin['value'],
'effect': skin['effect'],
'rarity': skin['rarity']
})
response_data = {
'skins': unlocked_skin_details,
'last_modified': last_modified.isoformat()
}
# Use json_util to handle MongoDB-specific types
response_json = json.dumps(response_data, default=json_util.default)
etag = generate_etag(response_json.encode('utf-8'))
if request.if_none_match and etag in request.if_none_match:
return '', 304
response = jsonify(response_data)
response.set_etag(etag)
response.last_modified = last_modified
return response, 200
@app.route('/select-skin', methods=['POST'])
def select_skin():
try:
data = request.json
username = data.get('username')
skin_type = data.get('skin_type')
skin_id = data.get('skin_id')
if not all([username, skin_type, skin_id]):
return jsonify({'error': 'Missing required fields'}), 400
user = users_collection.find_one({'username': username})
if not user:
return jsonify({'error': 'User not found'}), 404
try:
skin_object_id = ObjectId(skin_id)
except:
return jsonify({'error': 'Invalid skin ID format'}), 400
# Verify that the skin exists and is unlocked for the user
skin = skins_collection.find_one({'_id': skin_object_id})
if not skin or skin_object_id not in user.get('unlocked_skins', []):
return jsonify({'error': 'Invalid or locked skin'}), 400
# Update the user's selected skins and last modified time
result = users_collection.update_one(
{'username': username},
{
'$set': {
f'selected_skins.{skin_type}': skin_object_id,
'skins_last_modified': datetime.datetime.utcnow()
}
}
)
if result.modified_count == 0:
# Check if the skin was already selected
current_skin = user.get('selected_skins', {}).get(skin_type)
if current_skin and current_skin == skin_object_id:
return jsonify({'success': True, 'message': 'Skin already selected'}), 200
else:
return jsonify({'error': 'Failed to update skin'}), 500
# Fetch the updated skin data
updated_skin_data = {
skin_type: {
'name': skin['name'],
'value': skin['value'],
'effect': skin['effect']
}
}
# Find the player's socket ID
player_socket_id = None
for sid, player in game_state['players'].items():
if player.name == username:
player_socket_id = sid
break
if player_socket_id:
# Update the player's skin in the game state
game_state['players'][player_socket_id].skin.update(updated_skin_data)
# Emit a 'skin_update' event to all connected clients
socketio.emit('skin_update', {
'player_id': player_socket_id,
'skin_data': updated_skin_data
}, room=main_room)
return jsonify({'success': True, 'message': 'Skin selected and updated successfully'}), 200
except Exception as e:
app.logger.error(f"Error in select_skin: {str(e)}")
return jsonify({'error': 'An internal error occurred'}), 500
@app.route('/signup', methods=['POST'])
def signup():
try:
data = request.json
username = data.get('username')
email = data.get('email')
password = data.get('password')
if not username or not email or not password:
return jsonify({'success': False, 'message': 'All fields are required'}), 400
# Username validation
if not re.match(r'^[a-zA-Z0-9_]{3,20}$', username):
return jsonify({'success': False, 'message': 'Username must be 3-20 characters long and contain only letters, numbers, and underscores'}), 400
# Email validation
email_regex = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
if not email_regex.match(email):
return jsonify({'success': False, 'message': 'Invalid email format'}), 400
# Password length validation
if len(password) < 8:
return jsonify({'success': False, 'message': 'Password must be at least 8 characters long'}), 400
# Check for potentially harmful input
if not is_valid_input(username) or not is_valid_input(email):
return jsonify({'success': False, 'message': 'Invalid input detected'}), 400
existing_user = users_collection.find_one({'$or': [{'username': username}, {'email': email}]})
if existing_user:
return jsonify({'success': False, 'message': 'Username or email already exists'}), 400
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
default_classes = list(classes_collection.find({'unlocked_by_default': True}))
default_class_ids = [str(class_doc['_id']) for class_doc in default_classes]
new_user = {
'username': username,
'email': email,
'password': hashed_password,
'created_at': datetime.datetime.utcnow(),
"selected_skins": {
"color": ObjectId("669e1272813db368f3b19519"), # Glowy Blue
"hat": ObjectId(),
"image": ObjectId()
},
"unlocked_skins": [ObjectId("669e1272813db368f3b19519")], # Glowy Blue
"unlocked_classes": default_class_ids,
"selected_class": default_class_ids[0] if default_class_ids else None
}
users_collection.insert_one(new_user)
return jsonify({'success': True, 'message': 'User created successfully'}), 201
except Exception as e:
app.logger.error(f"Error during signup: {str(e)}")
return jsonify({'success': False, 'message': 'An error occurred during signup'}), 500
@app.route('/login', methods=['POST'])
def login():
data = request.json
username = data.get('username')
password = data.get('password')
user = users_collection.find_one({'username': username})
if not user or not bcrypt.checkpw(password.encode('utf-8'), user['password']):
return jsonify({'success': False, 'message': 'Invalid username or password'}), 401
try:
session_token = jwt.encode(
{'user_id': str(user['_id']), 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)},
app.config['SECRET_KEY'],
algorithm='HS256'
)
# If using PyJWT < 2.0.0, session_token will be bytes, so we need to decode it
if isinstance(session_token, bytes):
session_token = session_token.decode('utf-8')
sessions_collection.insert_one({
'user_id': user['_id'],
'token': session_token,
'created_at': datetime.datetime.utcnow()
})
return jsonify({'success': True, 'sessionToken': session_token}), 200
except Exception as e:
app.logger.error(f"Error during login: {str(e)}")
return jsonify({'success': False, 'message': 'An error occurred during login'}), 500
@app.route('/verify-session', methods=['POST'])
def verify_session():
data = request.json
session_token = data.get('sessionToken')
if not session_token:
return jsonify({'valid': False, 'message': 'No session token provided'}), 401
try:
payload = jwt.decode(session_token, app.config['SECRET_KEY'], algorithms=['HS256'])
user_id = payload['user_id']
user = users_collection.find_one({'_id': ObjectId(user_id)})
if user:
return jsonify({'valid': True, 'username': user['username']}), 200
else:
return jsonify({'valid': False, 'message': 'User not found'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'valid': False, 'message': 'Session expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'valid': False, 'message': 'Invalid token'}), 401
except Exception as e:
app.logger.error(f"Error during session verification: {str(e)}")
return jsonify({'valid': False, 'message': 'An error occurred during session verification'}), 500
# Update the get_player_skin function
@app.route('/get-player-skin', methods=['POST'])
def get_player_skin():
data = request.json
username = data.get('username')
if not username:
return jsonify({'error': 'Username is required'}), 400
# Check if the username is a guest
if username.startswith('Guest-'):
# Return a default skin for guests
return jsonify({
'color': {
'name': 'Guest Blue',
'value': '#00ffff',
'effect': None
}
}), 200
user = users_collection.find_one({'username': username})
if not user:
# If it's not a guest but user is not found, return a default skin
return jsonify({
'color': {
'name': 'Default Blue',
'value': '#00ffff',
'effect': None
}
}), 200
selected_skins = user.get('selected_skins', {})
skin_data = {}
for skin_type, skin_id in selected_skins.items():
skin = skins_collection.find_one({'_id': skin_id})
if skin:
skin_data[skin_type] = {
'name': skin['name'],
'value': skin['value'],
'effect': skin['effect']
}
if not skin_data:
# Return default skin if no custom skin is found
skin_data = {
'color': {
'name': 'Default Blue',
'value': '#00ffff',
'effect': None
}
}
# Generate ETag for caching
etag = generate_etag(json.dumps(skin_data, sort_keys=True).encode())
if request.if_none_match and etag in request.if_none_match:
return '', 304
response = make_response(jsonify(skin_data))
response.set_etag(etag)
response.cache_control.max_age = 3600 # Cache for 1 hour
response.cache_control.public = True
return response, 200
@socketio.on('start_audio')
def start_audio():
audio_streamer.start()
@socketio.on('stop_audio')
def stop_audio():
audio_streamer.stop()
@socketio.on('request_music_sync')
def on_request_music_sync():
if request.sid in game_state['players'] and music_player.is_playing:
emit('music_sync', {
'song': music_player.get_current_song(),
'startTime': music_player.start_time,
'serverTime': time.time(),
'songDuration': music_player.get_current_song_duration()
})
@socketio.on('connect')
def on_connect():
print(f"Player {request.sid} connected")
active_sockets_collection.update_one(
{'_id': request.sid},
{'$set': {'last_active': datetime.datetime.utcnow()}},
upsert=True
)
@socketio.on('disconnect')
def on_disconnect():
player_id = request.sid
if player_id in game_state['players']:
player = game_state['players'][player_id]
username = player.name
del game_state['players'][player_id]
active_players.discard(username)
socketio.emit('player_disconnected', player_id, to=main_room)
send_discord_alert(f"👋 Player {username} has disconnected!")
print(f"Player {player_id} disconnected")
active_sockets_collection.delete_one({'_id': player_id})
@socketio.on('player_paused')
def on_player_paused(data):
player_id = request.sid
if player_id in game_state['players']:
player = game_state['players'][player_id]
player.is_paused = data['isPaused']
emit('player_update', {'id': player_id, 'player': player.to_dict()}, to=main_room)
@socketio.on('join')
def on_join(data):
try:
player_id = request.sid
username = data.get('name')
player_class_id = data.get('class_id')
# Check if the player is already in the game
if username in active_players:
emit('join_error', {'message': 'You are already playing in another window or device.'})
return
join_room(main_room)
# Get player skin
user = users_collection.find_one({'username': username})
if user:
selected_skins = user.get('selected_skins', {})
skin_data = {}
for skin_type, skin_id in selected_skins.items():
skin = skins_collection.find_one({'_id': skin_id})
if skin:
skin_data[skin_type] = {
'name': skin['name'],
'value': skin['value'],
'effect': skin['effect']
}
else:
skin_data = {
'color': {
'name': 'Default Blue',
'value': '#00ffff',
'effect': None
}
}
# If no class_id is provided or the provided class_id is invalid, select a random unlocked_by_default class
if not player_class_id or not classes_collection.find_one({'_id': ObjectId(player_class_id)}):
default_classes = list(classes_collection.find({'unlocked_by_default': True}))
if default_classes:
player_class = random.choice(default_classes)
player_class_id = str(player_class['_id'])
else:
emit('join_error', {'message': 'No default classes available'})
return
else:
player_class = classes_collection.find_one({'_id': ObjectId(player_class_id)})
player_class['weapon_limit'] = int(player_class.get('weapon_limit', 1)) # Ensure this is an integer
# Get starting weapon
weapon = weapons_collection.find_one({'_id': player_class['starting_weapon']})
if weapon:
weapon['_id'] = str(weapon['_id']) # Convert ObjectId to string
else:
weapon = None # Provide a default if no weapon is found
new_player = Player(
x=random.randint(50, 1000),
y=random.randint(50, 1000),
name=username,
skin=skin_data,
weapon=weapon,
weapons=[weapon] if weapon else [], # Add this line
player_class=player_class
)
new_player.weapons = [weapon] if weapon else []
new_player.synth_coins = 0
new_player.is_paused = False # Ensure the player starts unpaused
new_player.weapon_limit = player_class['weapon_limit']
game_state['players'][player_id] = new_player
active_players.add(username)
serializable_game_state = serialize_game_state(game_state)
if len(game_state['enemies']) < MAX_ENEMIES:
game_state['enemies'] += generate_enemies(MAX_ENEMIES - len(game_state['enemies']))
# Send game state to the new player
emit('game_state', json.loads(json.dumps(serializable_game_state, cls=CustomJSONEncoder)), to=player_id)
# Notify all other players about the new player
emit('new_player', {'id': player_id, 'player': new_player.to_dict()}, to=main_room, include_self=False)
emit('synth_coins_update', {'coins': [{'x': coin.x, 'y': coin.y} for coin in game_state['synth_coins']]}, to=player_id)
# Then send music sync if music is playing
if music_player.is_playing:
emit('music_sync', {
'song': music_player.get_current_song(),
'startTime': music_player.start_time,
'serverTime': time.time(),
'songDuration': music_player.get_current_song_duration()
})
send_discord_alert(f"👋 Player {username} has joined the game!")
print(f"Player {username} (ID: {player_id}) joined the game")
except Exception as e:
logging.error(f"Error in on_join: {str(e)}")
emit('join_error', {'message': 'An error occurred while joining the game. Please try again.'})
@socketio.on('player_update')
def player_update(data):
player_id = request.sid
if player_id in game_state['players']:
player = game_state['players'][player_id]
player.update(data)
serialized_player = json.loads(json.dumps(player.to_dict(), cls=CustomJSONEncoder))
emit('player_update', {'id': player_id, 'player': serialized_player}, to=main_room)
@socketio.on('collect_synth_coin')
def on_collect_synth_coin(data):
player_id = request.sid
if player_id in game_state['players']:
player = game_state['players'][player_id]
coin_x, coin_y = data['coin_x'], data['coin_y']
for coin in game_state['synth_coins'][:]:
if math.hypot(coin.x - coin_x, coin.y - coin_y) < 0.1:
player.synth_coins += 100
game_state['synth_coins'].remove(coin)
print(f"Player {player.name} collected a coin. New count: {player.synth_coins}")
socketio.emit('coin_collected', {
'player_id': player_id,
'coin_count': player.synth_coins
}, room=player_id) # Send only to the player who collected the coin
break
@socketio.on('player_shoot')
def player_shoot(data):
player_id = request.sid
if player_id in game_state['players']:
player = game_state['players'][player_id]
angle = data['angle']
weapon_x = data['weaponX']
weapon_y = data['weaponY']
weapon_index = data.get('weaponIndex', 0)
if 0 <= weapon_index < len(player.weapons):
bullet = player.shoot(angle, weapon_x, weapon_y, weapon_index)
if bullet:
game_state['bullets'].append(bullet)
emit('new_bullet', bullet, room=main_room)
@socketio.on('upgrade_selected')
def on_upgrade_selected(data):
player_id = request.sid
upgrade_index = data['upgradeIndex']
upgrade_type = data['upgradeType']
upgrade_value = data['upgradeValue']
print(f"Player {player_id} selected upgrade: Type: {upgrade_type}, Value: {upgrade_value}, Index: {upgrade_index}")
if player_id in game_state['players']:
player = game_state['players'][player_id]
print(f"Player {player.name} (ID: {player_id}) current weapon attributes:")
print(player.weapon['base_attributes'])
if 'base_attributes' in player.weapon and upgrade_type in player.weapon['base_attributes']:
old_value = player.weapon['base_attributes'][upgrade_type]
player.weapon['base_attributes'][upgrade_type] += upgrade_value
new_value = player.weapon['base_attributes'][upgrade_type]
# print(f"Upgrading {upgrade_type} for player {player.name}:")
# print(f" Old value: {old_value}")
# print(f" Upgrade value: {upgrade_value}")
# print(f" New value: {new_value}")
# Notify the player about the upgrade
socketio.emit('upgrade_applied', {
'attribute': upgrade_type,
'value': upgrade_value,
'new_value': new_value
}, room=player_id)
player.is_paused = False
socketio.emit('player_update', {'id': player_id, 'player': player.to_dict()}, to=main_room)
print(f"Player {player.name} (ID: {player_id}) updated weapon attributes:")
print(player.weapon['base_attributes'])
else:
error_msg = f"Upgrade attribute {upgrade_type} not found in player's weapon attributes"
print(f"Error: {error_msg}")
logging.error(error_msg)
else:
print(f"Error: Player ID {player_id} not found in game state")
def add_discord_role(user_id, role_id):
url = f'https://discord.com/api/v10/guilds/{DISCORD_GUILD_ID}/members/{user_id}/roles/{role_id}'
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}',
'Content-Type': 'application/json'
}
response = requests.put(url, headers=headers)
if response.status_code == 204:
print(f"Successfully added role to user {user_id}")
else:
print(f"Failed to add role to user {user_id}. Status code: {response.status_code}")
def remove_discord_role(user_id, role_id):
url = f'https://discord.com/api/v10/guilds/{DISCORD_GUILD_ID}/members/{user_id}/roles/{role_id}'
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}',
'Content-Type': 'application/json'
}
response = requests.delete(url, headers=headers)
if response.status_code == 204:
print(f"Successfully removed role from user {user_id}")
else:
print(f"Failed to remove role from user {user_id}. Status code: {response.status_code}")
def get_random_upgrades(count):
all_upgrades = [
{'type': 'bullet_speed', 'name': 'Bullet Speed', 'value': 1},
{'type': 'bullet_size', 'name': 'Bullet Size', 'value': 1},
{'type': 'damage', 'name': 'Damage', 'value': 2},
{'type': 'fire_rate', 'name': 'Fire Rate', 'value': 0.1}
]
return random.sample(all_upgrades, count)
def cleanup_stale_sockets():
stale_time = datetime.datetime.utcnow() - datetime.timedelta(minutes=5)
stale_sockets = active_sockets_collection.find({'last_active': {'$lt': stale_time}})
for socket in stale_sockets:
socketio.server.disconnect(socket['_id'])
active_sockets_collection.delete_one({'_id': socket['_id']})
print(f"Cleaned up stale socket: {socket['_id']}")
active_sockets_collection.delete_many({'last_active': {'$lt': stale_time}})
def add_experience(player, amount):
player.experience += amount
if player.experience >= player.max_experience:
player.level += 1
player.experience -= player.max_experience
player.max_experience = int(player.max_experience * 1.2)
# Generate upgrade options and store them
player_id = next(pid for pid, p in game_state['players'].items() if p == player)
game_state['player_upgrade_options'][player_id] = get_random_upgrades(3)
return True
return False
def update_synth_coins(players):
for coin in game_state['synth_coins'][:]: # Use a copy for iteration
closest_player = None
closest_distance = float('inf')
for player in players.values():
if player.is_paused:
continue
distance = math.hypot(player.x - coin.x, player.y - coin.y)
if distance < coin.follow_radius and distance < closest_distance:
closest_player = player
closest_distance = distance
if closest_player:
dx = closest_player.x - coin.x
dy = closest_player.y - coin.y
distance = math.hypot(dx, dy)
if distance > 0:
coin.x += (dx / distance) * coin.follow_speed
coin.y += (dy / distance) * coin.follow_speed
# Check for collection
if distance <= closest_player.radius + coin.radius:
closest_player.synth_coins += 1
game_state['synth_coins'].remove(coin)
socketio.emit('coin_collected', {
'player_id': next(pid for pid, p in players.items() if p == closest_player),
'coin_count': closest_player.synth_coins,
'coin_x': coin.x,
'coin_y': coin.y
}, room=main_room)
def game_loop():
global game_state
while True:
try:
current_time = time.time()
# Update bullet positions and remove expired bullets
game_state['bullets'] = [bullet for bullet in game_state['bullets'] if current_time - bullet['created_at'] < BULLET_LIFETIME]
for bullet in game_state['bullets']:
bullet['x'] += math.cos(bullet['angle']) * bullet['speed']
bullet['y'] += math.sin(bullet['angle']) * bullet['speed']
# Ensure enemies are spawned if there are not enough
if len(game_state['enemies']) < MAX_ENEMIES:
new_enemies = generate_enemies(MAX_ENEMIES - len(game_state['enemies']))
game_state['enemies'].extend(new_enemies)
# Update player health and check for collisions
for player_id, player in game_state['players'].items():
if not player.is_paused:
# Update player position and other attributes
player.update({'x': player.x, 'y': player.y}) # Pass current position to maintain consistency
# Check for collisions with enemies and apply damage
for enemy in game_state['enemies']:
distance = math.hypot(player.x - enemy['x'], player.y - enemy['y'])
# Check for damage to player
if distance < 30 + player.radius and not player.is_paused:
player.take_damage(enemy['attributes']['damage'], current_time)
# If player's health reaches 0, handle player death
if player.current_health <= 0:
handle_player_death(player_id)
# Update enemy positions and check for collisions
for enemy in game_state['enemies']:
if game_state['players']:
active_players = [p for p in game_state['players'].values() if not p.is_paused]
if active_players:
nearest_player = min(active_players, key=lambda p: math.hypot(p.x - enemy['x'], p.y - enemy['y']))
dx = nearest_player.x - enemy['x']
dy = nearest_player.y - enemy['y']
distance = math.hypot(dx, dy)
# Calculate new position
new_x = enemy['x']
new_y = enemy['y']
if distance > 0:
new_x += (dx / distance) * enemy['attributes']['speed']
new_y += (dy / distance) * enemy['attributes']['speed']
# Check collision with other enemies
for other_enemy in game_state['enemies']:
if other_enemy != enemy:
collision_distance = math.hypot(new_x - other_enemy['x'], new_y - other_enemy['y'])
if collision_distance < 30: # Assuming enemy size is 30
# Calculate bounce direction
bounce_dx = new_x - other_enemy['x']
bounce_dy = new_y - other_enemy['y']
bounce_distance = math.hypot(bounce_dx, bounce_dy)
if bounce_distance > 0:
new_x = other_enemy['x'] + (bounce_dx / bounce_distance) * 30
new_y = other_enemy['y'] + (bounce_dy / bounce_distance) * 30
# Check collision with players
for player in active_players:
collision_distance = math.hypot(new_x - player.x, new_y - player.y)
if collision_distance < 30 + player.radius: # Assuming enemy size is 30
# Calculate bounce direction
bounce_dx = new_x - player.x
bounce_dy = new_y - player.y
bounce_distance = math.hypot(bounce_dx, bounce_dy)
if bounce_distance > 0:
new_x = player.x + (bounce_dx / bounce_distance) * (30 + player.radius)
new_y = player.y + (bounce_dy / bounce_distance) * (30 + player.radius)
# Update enemy position
enemy['x'] = new_x
enemy['y'] = new_y
# Check for collisions between bullets and enemies
for bullet in game_state['bullets'][:]:
for enemy in game_state['enemies'][:]:
if math.hypot(bullet['x'] - enemy['x'], bullet['y'] - enemy['y']) < 30 + bullet['size']:
damage_dealt = bullet['damage']
enemy['current_health'] -= damage_dealt
game_state['bullets'].remove(bullet)
# Generate impact effect data
impact_effect = {
'x': bullet['x'],
'y': bullet['y'],
'color': bullet['color'], # Use the bullet's color
'size': bullet['size'],
'angle': bullet['angle']
}
# Emit impact effect to all clients
socketio.emit('bullet_impact', impact_effect, room=main_room)
# Emit damage_dealt event
socketio.emit('damage_dealt', {
'damage': damage_dealt,
'enemyX': enemy['x'],
'enemyY': enemy['y']
}, room=bullet['player_id'])
if enemy['current_health'] <= 0:
game_state['enemies'].remove(enemy)
coin = SynthCoin(enemy['x'], enemy['y'])
game_state['synth_coins'].append(coin)
socketio.emit('new_synth_coin', {'x': coin.x, 'y': coin.y}, room=main_room)
# Grant experience to the player who shot the bullet
player_id = bullet.get('player_id')
if player_id and player_id in game_state['players']:
player = game_state['players'][player_id]
exp_gained = enemy['experience'] # Use the enemy's experience value
leveled_up = add_experience(player, exp_gained)
socketio.emit('experience_update', {
'player_id': player_id,
'experience': player.experience,
'max_experience': player.max_experience,
'level': player.level,
'leveled_up': leveled_up,
'exp_gained': exp_gained
}, room=main_room)
# Generate particles
particles = [
{
'x': enemy['x'],
'y': enemy['y'],
'color': 'red', # You can change this to match the enemy color if needed
'radius': random.uniform(2, 5),
'velocityX': random.uniform(-2, 2),
'velocityY': random.uniform(-2, 2),
'ttl': 60
} for _ in range(10)
]
socketio.emit('enemy_destroyed', {'particles': particles}, room=main_room)
# Spawn a new enemy if needed
if len(game_state['enemies']) < MAX_ENEMIES:
game_state['enemies'].extend(generate_enemies(1))
break
# Update player shooting
# Update player shooting
for player_id, player in game_state['players'].items():
if not player.is_paused:
update_synth_coins(game_state['players'])
if player.auto_fire_enabled: # Only auto-fire if enabled
nearest_enemy = find_nearest_enemy(player)
if nearest_enemy:
target_angle = math.atan2(nearest_enemy['y'] - player.y, nearest_enemy['x'] - player.x)
for i, weapon in enumerate(player.weapons):
player.weapon_angles[i] = lerp_angle(player.weapon_angles[i], target_angle, 0.1)
if current_time - player.last_shoot_times[i] > 1 / weapon['base_attributes']['fire_rate']:
weapon_x = player.x + math.cos(player.weapon_angles[i]) * player.orbit_radius
weapon_y = player.y + math.sin(player.weapon_angles[i]) * player.orbit_radius
bullet = player.shoot(player.weapon_angles[i], weapon_x, weapon_y, i)
if bullet:
game_state['bullets'].append(bullet)
socketio.emit('new_bullet', bullet, room=main_room)
# Emit updated weapon angles
socketio.emit('player_update', {'id': player_id, 'player': player.to_dict()}, room=main_room)
# Send current music state to the new player
if game_state['is_playing']:
song_duration = len(AudioSegment.from_mp3(game_state['playlist'][game_state['current_song_index']]))
if current_time - game_state['music_start_time'] > song_duration / 1000:
change_song()
if music_player.is_playing:
if music_player.update():
# Song has changed, notify clients
socketio.emit('music_control', {
'action': 'change',
'song': f"/{music_player.get_current_song()}",
'startTime': music_player.start_time,
'songDuration': music_player.get_current_song_duration()
}, room=main_room)
game_state['synth_coins'] = [coin for coin in game_state['synth_coins'] if current_time - coin.created_at < 300]
# Emit game update
serialized_state = serialize_game_state(game_state)
serialized_state['synth_coins'] = [{'x': coin.x, 'y': coin.y} for coin in game_state['synth_coins']]
socketio.emit('game_update', serialized_state, room=main_room)
socketio.sleep(0.033) # Update approximately 30 times per second
except Exception as e:
logging.error(f"Error in game loop: {str(e)}")
break
def lerp_angle(a, b, t):
diff = b - a
adjusted = ((diff + math.pi) % (2 * math.pi)) - math.pi
return a + adjusted * t
def handle_player_death(player_id):
player = game_state['players'][player_id]
# Implement death logic here, for example:
# - Respawn the player
# - Reset their health
# - Apply any death penalties
player.x = random.randint(50, MAP_WIDTH - 50)
player.y = random.randint(50, MAP_HEIGHT - 50)
player.current_health = player.max_health
socketio.emit('player_died', {'player_id': player_id}, room=main_room)
def find_nearest_enemy(player):
nearest_enemy = None
nearest_distance = float('inf')
min_targeting_distance = player.orbit_radius + player.targeting_buffer
for enemy in game_state['enemies']:
distance = math.hypot(enemy['x'] - player.x, enemy['y'] - player.y)
if min_targeting_distance <= distance < nearest_distance:
nearest_distance = distance
nearest_enemy = enemy
return nearest_enemy
def create_bullet(player, angle, weapon_x, weapon_y, weapon):
bullet = {
'x': weapon_x,
'y': weapon_y,
'angle': angle,
'speed': weapon['base_attributes']['bullet_speed'],
'size': weapon['base_attributes']['bullet_size'],
'damage': weapon['base_attributes']['damage'] * player.damage_multiplier,
'created_at': time.time(),
'player_id': next(pid for pid, p in game_state['players'].items() if p == player),
'weapon_type': weapon['weapon_type'],
'bullet_image_source': weapon['bullet_image_source'],
'color': weapon.get('bullet_color', '#ffff00') # Default to yellow if no color specified
}
return bullet
# Admin console commands
def start_music():
music_player.start()
socketio.emit('music_control', {
'action': 'start',
'song': music_player.get_current_song(),
'startTime': music_player.start_time,
'songDuration': music_player.get_current_song_duration()
}, room=main_room)
def stop_music():
music_player.stop()
socketio.emit('music_stop', room=main_room)
def change_song():
new_song = music_player.next_song()
socketio.emit('music_control', {
'action': 'change',
'song': new_song,
'startTime': music_player.start_time,
'songDuration': music_player.get_current_song_duration()
}, room=main_room)
class AudioStreamer:
def __init__(self, music_player):
self.music_player = music_player
self.current_position = 0
self.is_playing = False
def get_next_chunk(self):
if self.current_position >= len(self.music_player.current_audio):
if self.music_player.update():
self.current_position = 0
else:
return None
chunk = self.music_player.current_audio[self.current_position:self.current_position + CHUNK_SIZE]
self.current_position += CHUNK_SIZE
return chunk
def start(self):
self.is_playing = True
def stop(self):
self.is_playing = False
# Initialize the audio streamer
class MusicPlayer:
def __init__(self, playlist):
self.playlist = playlist
self.current_song_index = 0
self.start_time = None
self.is_playing = False
self.song_durations = self.get_song_durations()
self.current_audio = None
def get_song_durations(self):
durations = {}
for song in self.playlist:
try:
durations[os.path.basename(song)] = len(AudioSegment.from_mp3(song)) / 1000
except Exception as e:
logging.error(f"Error getting duration for {song}: {str(e)}")
durations[os.path.basename(song)] = 180 # Default to 3 minutes if there's an error
return durations
def start(self):
self.is_playing = True
self.start_time = time.time()
self.load_current_song()
send_discord_alert(f"🎵 Now playing: {self.get_current_song()}")
def stop(self):
self.is_playing = False
self.start_time = None
def get_current_song(self):
# Return just the filename, not the full path
return os.path.basename(self.playlist[self.current_song_index])
def get_current_song_duration(self):
return self.song_durations[self.get_current_song()]
def load_current_song(self):
self.current_audio = AudioSegment.from_mp3(self.playlist[self.current_song_index])
def update(self):
if self.is_playing:
current_time = time.time()
elapsed_time = current_time - self.start_time
current_song_duration = self.get_current_song_duration()
if elapsed_time > current_song_duration:
self.current_song_index = (self.current_song_index + 1) % len(self.playlist)
self.start_time = current_time
self.load_current_song()
return True # Indicates that the song has changed
return False
def next_song(self):
self.current_song_index = (self.current_song_index + 1) % len(self.playlist)
self.start_time = time.time()
self.load_current_song()
send_discord_alert(f"🎵 Now playing: {self.get_current_song()}")
return self.get_current_song()
def change_song(self):
self.current_song_index = (self.current_song_index + 1) % len(self.playlist)
self.start_time = time.time()
self.load_current_song()
send_discord_alert(f"🎵 Now playing: {self.get_current_song()}")
def reload_playlist(self):
new_playlist = load_music_playlist()
# Find the current song in the new playlist
current_song = self.playlist[self.current_song_index]
if current_song in new_playlist:
self.current_song_index = new_playlist.index(current_song)
else:
self.current_song_index = 0 # Reset to the first song if current song not found
self.playlist = new_playlist
self.song_durations = self.get_song_durations()
return len(self.playlist)
# Initialize the music player with the dynamically loaded playlist
music_player = MusicPlayer(load_music_playlist())
audio_streamer = AudioStreamer(music_player)
class Player:
def __init__(self, x, y, name, skin, weapon, weapons, player_class):
self.x = x
self.y = y
self.radius = 20
self.targeting_buffer = 10
self.name = name
self.skin = skin
self.weapon = weapon
self.weapons = [weapon] if weapon else []
self.weapon_angles = [0] * len(weapons)
self.last_shoot_times = [0] * len(weapons)
self.weapon_limit = int(player_class.get('weapon_limit', 1)) # Ensure this is an integer
self.weapon_angle = 0
self.orbit_radius = 40
self.player_class = player_class
self.health = player_class['base_attributes']['health']
self.speed = player_class['base_attributes']['speed']
self.damage_multiplier = player_class['base_attributes']['damage_multiplier']
self.level = 1
self.experience = 0
self.max_experience = 100
self.last_shot_time = 0
self.is_paused = False
self.synth_coins = 0
self.max_health = player_class['base_attributes']['health']
self.current_health = self.max_health
self.health_regen = player_class['base_attributes'].get('health_regen', 1) # Default to 1 if not specified
self.invincibility_duration = 1 # 1 second of invincibility after taking damage
self.last_damage_time = 0
self.regen_cooldown = 5 # 5 seconds cooldown before health regeneration starts
self.auto_fire_enabled = False # Default to False
def to_dict(self):
return {
'x': self.x,
'y': self.y,
'name': self.name,
'skin': self.skin,
'weapon': self._serialize_dict(self.weapon),
'weapon_angle': self.weapon_angle,
'player_class': self._serialize_dict(self.player_class),
'health': self.health,
'speed': self.speed,
'damage_multiplier': self.damage_multiplier,
'level': self.level,
'experience': self.experience,
'max_experience': self.max_experience,
'last_shot_time': self.last_shot_time,
'is_paused': self.is_paused,
'weapons': [self._serialize_dict(w) for w in self.weapons],
'weapon_angles': self.weapon_angles,
'weapon_limit': self.weapon_limit,
'current_health': self.current_health,
'max_health': self.max_health,
'auto_fire_enabled': self.auto_fire_enabled
}
def add_weapon(self, weapon):
if len(self.weapons) < self.weapon_limit:
self.weapons.append(weapon)
self.weapon_angles.append(0)
self.last_shoot_times.append(0)
def shoot(self, angle, weapon_x, weapon_y, weapon_index):
if weapon_index < 0 or weapon_index >= len(self.weapons):
return None
weapon = self.weapons[weapon_index]
current_time = time.time()
if current_time - self.last_shoot_times[weapon_index] > 1 / weapon['base_attributes']['fire_rate']:
bullet = create_bullet(self, angle, weapon_x, weapon_y, weapon)
self.last_shoot_times[weapon_index] = current_time
return bullet
return None
def _serialize_dict(self, d):
"""Helper method to serialize dictionaries containing ObjectId"""
if isinstance(d, dict):
return {k: str(v) if isinstance(v, ObjectId) else self._serialize_dict(v) for k, v in d.items()}
elif isinstance(d, list):
return [self._serialize_dict(v) for v in d]
elif isinstance(d, ObjectId):
return str(d)
else:
return d
def update(self, data):
current_time = time.time()
# Update player attributes based on the data received
if 'x' in data:
self.x = data['x']
if 'y' in data:
self.y = data['y']
# Add other attributes as needed
if 'weapon_angle' in data:
self.weapon_angle = data['weapon_angle']
if 'weapon_angles' in data:
self.weapon_angles = data['weapon_angles']
if current_time - self.last_damage_time > self.regen_cooldown:
if not self.is_paused:
self.regenerate_health()
if 'auto_fire_enabled' in data:
self.auto_fire_enabled = data['auto_fire_enabled']
def take_damage(self, damage, current_time):
if current_time - self.last_damage_time > self.invincibility_duration:
self.current_health -= damage
self.current_health = max(0, self.current_health) # Ensure health doesn't go below 0
self.last_damage_time = current_time
def regenerate_health(self):
self.current_health = min(self.current_health + self.health_regen, self.max_health)
def add_experience(self, amount):
self.experience += amount
leveled_up = False
while self.experience >= self.max_experience:
self.level += 1
self.experience -= self.max_experience
self.max_experience = int(self.max_experience * 1.2)
leveled_up = True
return leveled_up
def stream_audio():
while True:
if audio_streamer.is_playing:
chunk = audio_streamer.get_next_chunk()
buffer = io.BytesIO()
chunk.export(buffer, format="mp3")
audio_data = buffer.getvalue()
socketio.emit('audio_chunk', {'data': audio_data}, room=main_room)
socketio.sleep(CHUNK_SIZE / 44100) # Sleep for the duration of the chunk
class AdminConsole(cmd.Cmd):
prompt = 'game_admin> '
def do_start_music(self, arg):
"""Start the music"""
start_music()
def do_stop_music(self, arg):
"""Stop the music"""
stop_music()
def do_change_song(self, arg):
"""Change the current song"""
if not arg:
print("Error: Song filename is required")
else:
change_song(arg)
def do_quit(self, arg):
"""Exit the admin console"""
print("Exiting admin console")
return True
def do_next_song(self, arg):
"""Change to the next song in the playlist"""
new_song = music_player.next_song()
print(f"Changed to next song: {new_song}")
socketio.emit('music_control', {
'action': 'change',
'song': new_song,
'startTime': music_player.start_time,
'songDuration': music_player.get_current_song_duration()
}, room=main_room)
def do_send_alert(self, arg):
"""Send an alert to all players: send_alert """
if arg:
send_alert(arg)
print(f"Alert sent: {arg}")
send_discord_alert(arg)
else:
print("Error: Please provide a message for the alert")
def do_reload_playlist(self, arg):
"""Reload the music playlist"""
song_count = music_player.reload_playlist()
print(f"Playlist reloaded. {song_count} songs available.")
# Optionally, you can change to the next song after reloading
new_song = music_player.next_song()
print(f"Changed to next song: {new_song}")
socketio.emit('music_control', {
'action': 'change',
'song': new_song,
'startTime': music_player.start_time,
'songDuration': music_player.get_current_song_duration()
}, room=main_room)
def admin_cli():
try:
AdminConsole().cmdloop()
except Exception as e:
logging.error(f"Error in admin console: {str(e)}")
if __name__ == '__main__':
cleanup_stale_sockets()
cli_thread = threading.Thread(target=admin_cli)
cli_thread.daemon = True
cli_thread.start()
# Start the music automatically
start_music()
game_loop_thread = socketio.start_background_task(game_loop)
audio_stream_thread = socketio.start_background_task(stream_audio)
try:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(
'C:/Certbot/live/resonancerumble.com/fullchain.pem',
'C:/Certbot/live/resonancerumble.com/privkey.pem'
)
send_discord_alert("🚀 Resonance Rumble server has started!")
socketio.run(app, host='0.0.0.0', port=443, ssl_context=ssl_context)
except KeyboardInterrupt:
logging.info("Server shutting down...")
finally:
# Perform cleanup
logging.info("Cleaning up resources...")
if 'client' in globals():
client.close()
socketio.stop()