Merge branch 'v2.4.4' into stable

This commit is contained in:
usmannasir 2025-10-27 19:54:08 +05:00
commit 99b0476358
37 changed files with 4620 additions and 3053 deletions

View File

@ -5,7 +5,7 @@
# Django Configuration
SECRET_KEY=your_very_long_random_secret_key_here_minimum_50_characters
DEBUG=False
ALLOWED_HOSTS=localhost,127.0.0.1,yourdomain.com
ALLOWED_HOSTS=*
# Database Configuration - CyberPanel Database
DB_NAME=cyberpanel

View File

@ -4,9 +4,7 @@
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="5251c5c9-f2a1-41f2-bc76-10b517091df1" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
</list>
<list default="true" id="5251c5c9-f2a1-41f2-bc76-10b517091df1" name="Changes" comment="" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
@ -48,7 +46,7 @@
"RunOnceActivity.TerminalTabsStorage.copyFrom.TerminalArrangementManager": "true",
"RunOnceActivity.git.unshallow": "true",
"SHELLCHECK.PATH": "/Users/cyberpersons/Library/Application Support/JetBrains/PyCharm2025.1/plugins/Shell Script/shellcheck",
"git-widget-placeholder": "v2.4.4-dev",
"git-widget-placeholder": "stable",
"last_opened_file_path": "/Users/cyberpersons/cyberpanel",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
@ -118,6 +116,7 @@
<workItem from="1754433799097" duration="517000" />
<workItem from="1754448353513" duration="2970000" />
<workItem from="1754511414251" duration="48713000" />
<workItem from="1756885332634" duration="10552000" />
</task>
<servers />
</component>

View File

@ -192,9 +192,13 @@ class secMiddleware:
pathActual.find('/api/') > -1 or pathActual.find('aiscanner/scheduled-scans') > -1)
if isAPIEndpoint:
# Skip validation for fields that contain legitimate code/scripts
if key == 'content' or key == 'fileContent' or key == 'configData' or key == 'rewriteRules' or key == 'modSecRules' or key == 'contentNow' or key == 'emailMessage':
continue
# For API endpoints, still check for the most dangerous command injection characters
if isinstance(value, (str, bytes)) and (value.find('- -') > -1 or value.find('\n') > -1 or value.find(';') > -1 or
value.find('&&') > -1 or value.find('||') > -1 or value.find('|') > -1 or
if isinstance(value, (str, bytes)) and (value.find('- -') > -1 or value.find('\n') > -1 or value.find(';') > -1 or
value.find('&&') > -1 or value.find('||') > -1 or value.find('|') > -1 or
value.find('...') > -1 or value.find("`") > -1 or value.find("$") > -1 or
value.find('../') > -1 or value.find('../../') > -1):
logging.writeToFile(request.body)
@ -212,7 +216,7 @@ class secMiddleware:
or key == 'emailMessage' or key == 'configData' or key == 'rewriteRules' \
or key == 'modSecRules' or key == 'recordContentTXT' or key == 'SecAuditLogRelevantStatus' \
or key == 'fileContent' or key == 'commands' or key == 'gitHost' or key == 'ipv6' or key == 'contentNow' \
or key == 'time_of_day' or key == 'notification_emails' or key == 'domains':
or key == 'time_of_day' or key == 'notification_emails' or key == 'domains' or key == 'content':
continue
# Skip validation for API endpoints that need JSON structure characters

View File

@ -33,7 +33,7 @@ SECRET_KEY = os.getenv('SECRET_KEY', 'xr%j*p!*$0d%(-(e%@-*hyoz4$f%y77coq0u)6pwmj
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
# Allow configuration via environment variable, fallback to wildcard for backward compatibility
# Allow configuration via environment variable, with wildcard fallback for universal compatibility
ALLOWED_HOSTS = os.getenv('ALLOWED_HOSTS', '*').split(',')
# Application definition

View File

@ -304,11 +304,34 @@ class AIScannerManager:
self.logger.writeToFile(f'[AIScannerManager.startScan] VPS eligible for free scans, getting API key for IP: {server_ip}')
vps_key_data = self.get_or_create_vps_api_key(server_ip)
if vps_key_data:
vps_api_key = vps_key_data.get('api_key')
free_scans_remaining = vps_key_data.get('free_scans_remaining', 0)
self.logger.writeToFile(f'[AIScannerManager.startScan] VPS API key obtained, {free_scans_remaining} free scans remaining')
# Save VPS API key to database for future operations (file fixes, etc.)
try:
scanner_settings, created = AIScannerSettings.objects.get_or_create(
admin=admin,
defaults={
'api_key': vps_api_key,
'balance': 0.0000,
'is_payment_configured': True # VPS accounts have implicit payment
}
)
# Update existing settings if API key is different or empty
if not created and (not scanner_settings.api_key or scanner_settings.api_key != vps_api_key):
scanner_settings.api_key = vps_api_key
scanner_settings.is_payment_configured = True
scanner_settings.save()
self.logger.writeToFile(f'[AIScannerManager.startScan] Updated VPS API key in database')
elif created:
self.logger.writeToFile(f'[AIScannerManager.startScan] Saved new VPS API key to database')
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.startScan] Error saving VPS API key: {str(e)}')
# Continue even if saving fails - scan can still proceed
else:
self.logger.writeToFile(f'[AIScannerManager.startScan] Failed to get VPS API key')
return JsonResponse({'success': False, 'error': 'Failed to authenticate VPS for free scans'})
@ -492,6 +515,12 @@ class AIScannerManager:
if vps_key_data and vps_key_data.get('api_key'):
# Use VPS API key for adding payment method
api_key_to_use = vps_key_data.get('api_key')
# Save VPS API key to database
scanner_settings.api_key = api_key_to_use
scanner_settings.is_payment_configured = True
scanner_settings.save()
self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Saved VPS API key to database')
else:
return JsonResponse({'success': False, 'error': 'Failed to authenticate VPS'})
else:
@ -510,6 +539,15 @@ class AIScannerManager:
if vps_key_data and vps_key_data.get('api_key'):
# Use VPS API key for adding payment method
api_key_to_use = vps_key_data.get('api_key')
# Create scanner settings with VPS API key
AIScannerSettings.objects.create(
admin=admin,
api_key=api_key_to_use,
balance=0.0000,
is_payment_configured=True
)
self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Created new scanner settings with VPS API key')
else:
return JsonResponse({'success': False, 'error': 'Failed to authenticate VPS'})
else:

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,43 @@
-- AI Scanner File Operations Audit Tables
-- These tables track file operations performed by the scanner for security and auditing
-- Drop tables if they exist (use with caution in production)
-- DROP TABLE IF EXISTS scanner_file_operations;
-- DROP TABLE IF EXISTS scanner_api_rate_limits;
-- Table: scanner_file_operations
-- Tracks all file operations (backup, read, replace, rename, delete)
CREATE TABLE IF NOT EXISTS scanner_file_operations (
id INT AUTO_INCREMENT PRIMARY KEY,
scan_id VARCHAR(255) NOT NULL,
operation VARCHAR(20) NOT NULL,
file_path VARCHAR(500) NOT NULL,
backup_path VARCHAR(500) NULL,
success BOOLEAN NOT NULL DEFAULT FALSE,
error_message TEXT NULL,
ip_address VARCHAR(45) NULL,
user_agent VARCHAR(255) NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_scan_id (scan_id),
INDEX idx_created_at (created_at),
INDEX idx_scan_created (scan_id, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Table: scanner_api_rate_limits
-- Rate limiting for scanner API endpoints
CREATE TABLE IF NOT EXISTS scanner_api_rate_limits (
id INT AUTO_INCREMENT PRIMARY KEY,
scan_id VARCHAR(255) NOT NULL,
endpoint VARCHAR(100) NOT NULL,
request_count INT NOT NULL DEFAULT 0,
last_request_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY unique_scan_endpoint (scan_id, endpoint),
INDEX idx_scan_endpoint (scan_id, endpoint)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Show created tables
SHOW TABLES LIKE 'scanner_%';
-- Show table structures
DESCRIBE scanner_file_operations;
DESCRIBE scanner_api_rate_limits;

View File

@ -272,7 +272,7 @@ class ScheduledScanExecution(models.Model):
('failed', 'Failed'),
('cancelled', 'Cancelled'),
]
scheduled_scan = models.ForeignKey(ScheduledScan, on_delete=models.CASCADE, related_name='executions')
execution_time = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
@ -285,14 +285,14 @@ class ScheduledScanExecution(models.Model):
error_message = models.TextField(blank=True, null=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
db_table = 'ai_scanner_scheduled_executions'
ordering = ['-execution_time']
def __str__(self):
return f"Execution of {self.scheduled_scan.name} at {self.execution_time}"
@property
def scanned_domains(self):
"""Parse domains scanned JSON"""
@ -302,7 +302,7 @@ class ScheduledScanExecution(models.Model):
except json.JSONDecodeError:
return []
return []
@property
def scan_id_list(self):
"""Parse scan IDs JSON"""
@ -312,11 +312,60 @@ class ScheduledScanExecution(models.Model):
except json.JSONDecodeError:
return []
return []
def set_scanned_domains(self, domain_list):
"""Set scanned domains from list"""
self.domains_scanned = json.dumps(domain_list)
def set_scan_ids(self, scan_id_list):
"""Set scan IDs from list"""
self.scan_ids = json.dumps(scan_id_list)
class ScannerFileOperation(models.Model):
"""Audit log for file operations performed by scanner"""
OPERATION_CHOICES = [
('backup', 'Backup'),
('read', 'Read'),
('replace', 'Replace'),
('rename', 'Rename'),
('delete', 'Delete'),
]
scan_id = models.CharField(max_length=255, db_index=True)
operation = models.CharField(max_length=20, choices=OPERATION_CHOICES)
file_path = models.CharField(max_length=500)
backup_path = models.CharField(max_length=500, blank=True, null=True)
success = models.BooleanField(default=False)
error_message = models.TextField(blank=True, null=True)
ip_address = models.CharField(max_length=45, blank=True, null=True)
user_agent = models.CharField(max_length=255, blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
class Meta:
db_table = 'scanner_file_operations'
ordering = ['-created_at']
indexes = [
models.Index(fields=['scan_id', 'created_at']),
]
def __str__(self):
return f"{self.operation} - {self.file_path} ({'success' if self.success else 'failed'})"
class ScannerAPIRateLimit(models.Model):
"""Rate limiting for scanner API endpoints"""
scan_id = models.CharField(max_length=255)
endpoint = models.CharField(max_length=100)
request_count = models.IntegerField(default=0)
last_request_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'scanner_api_rate_limits'
unique_together = ['scan_id', 'endpoint']
indexes = [
models.Index(fields=['scan_id', 'endpoint']),
]
def __str__(self):
return f"{self.scan_id} - {self.endpoint}: {self.request_count} requests"

View File

@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Test endpoint to debug API key validation for AI Scanner
Add this to your aiScanner/urls.py:
path('api/test-auth/', test_api_endpoint.test_auth, name='test_auth'),
"""
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import json
from .api import validate_access_token, extract_auth_token
from .models import AIScannerSettings, ScanHistory
from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
@csrf_exempt
@require_http_methods(['POST'])
def test_auth(request):
"""
Test endpoint to validate API authentication
Usage:
curl -X POST http://localhost:8001/api/ai-scanner/test-auth/ \
-H "X-API-Key: cp_your_api_key_here" \
-H "X-Scan-ID: your-scan-id" \
-H "Content-Type: application/json" \
-d '{"scan_id": "your-scan-id"}'
"""
try:
# Parse request
data = json.loads(request.body) if request.body else {}
scan_id = data.get('scan_id', '') or request.META.get('HTTP_X_SCAN_ID', '')
# Extract authentication token
access_token, auth_type = extract_auth_token(request)
response = {
'auth_type_detected': auth_type,
'token_prefix': access_token[:20] + '...' if access_token else None,
'scan_id': scan_id,
'validation_steps': []
}
if not access_token:
response['error'] = 'No authentication token found'
response['validation_steps'].append('FAILED: No Bearer token or X-API-Key header found')
return JsonResponse(response, status=401)
if not scan_id:
response['error'] = 'No scan_id provided'
response['validation_steps'].append('FAILED: No scan_id in body or X-Scan-ID header')
return JsonResponse(response, status=400)
# Check if API key exists in database
response['validation_steps'].append(f'Checking if token {access_token[:20]}... exists in database')
api_key_exists = AIScannerSettings.objects.filter(api_key=access_token).exists()
response['api_key_exists'] = api_key_exists
if api_key_exists:
response['validation_steps'].append('SUCCESS: API key found in AIScannerSettings')
# Get the admin who owns this API key
settings = AIScannerSettings.objects.get(api_key=access_token)
response['api_key_owner'] = settings.admin.userName
response['validation_steps'].append(f'API key belongs to admin: {settings.admin.userName}')
else:
response['validation_steps'].append('WARNING: API key not found in AIScannerSettings')
# Check if scan exists
response['validation_steps'].append(f'Checking if scan {scan_id} exists')
try:
scan = ScanHistory.objects.get(scan_id=scan_id)
response['scan_exists'] = True
response['scan_domain'] = scan.domain
response['scan_admin'] = scan.admin.userName
response['scan_status'] = scan.status
response['validation_steps'].append(f'SUCCESS: Scan found for domain {scan.domain}, admin {scan.admin.userName}')
except ScanHistory.DoesNotExist:
response['scan_exists'] = False
response['validation_steps'].append('WARNING: Scan not found in database')
# Now validate using the actual validation function
response['validation_steps'].append('Running validate_access_token() function...')
auth_wrapper, error = validate_access_token(access_token, scan_id)
if error:
response['validation_error'] = error
response['validation_success'] = False
response['validation_steps'].append(f'FAILED: {error}')
return JsonResponse(response, status=401)
else:
response['validation_success'] = True
response['auth_wrapper'] = {
'domain': auth_wrapper.domain,
'wp_path': auth_wrapper.wp_path,
'auth_type': auth_wrapper.auth_type,
'external_app': auth_wrapper.external_app
}
response['validation_steps'].append(f'SUCCESS: Token validated as {auth_wrapper.auth_type}')
return JsonResponse(response)
except Exception as e:
logging.writeToFile(f'[API TEST] Error: {str(e)}')
return JsonResponse({
'error': str(e),
'validation_steps': ['EXCEPTION: ' + str(e)]
}, status=500)
@csrf_exempt
@require_http_methods(['GET'])
def list_api_keys(request):
"""
Debug endpoint to list all API keys in the system
Usage:
curl http://localhost:8001/api/ai-scanner/list-api-keys/
"""
try:
api_keys = []
for settings in AIScannerSettings.objects.all():
api_keys.append({
'admin': settings.admin.userName,
'api_key_prefix': settings.api_key[:20] + '...' if settings.api_key else 'None',
'balance': float(settings.balance),
'is_payment_configured': settings.is_payment_configured
})
recent_scans = []
for scan in ScanHistory.objects.all()[:5]:
recent_scans.append({
'scan_id': scan.scan_id,
'domain': scan.domain,
'admin': scan.admin.userName,
'status': scan.status,
'started_at': scan.started_at.isoformat() if scan.started_at else None
})
return JsonResponse({
'api_keys': api_keys,
'recent_scans': recent_scans
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)

View File

@ -265,9 +265,29 @@ def getPlatformMonitorUrl(request, scan_id):
vps_info.get('free_scans_available', 0) > 0):
vps_key_data = sm.get_or_create_vps_api_key(server_ip)
if vps_key_data and vps_key_data.get('api_key'):
api_key = vps_key_data.get('api_key')
# Save VPS API key to database for future operations
try:
admin = Administrator.objects.get(pk=userID)
scanner_settings, created = AIScannerSettings.objects.get_or_create(
admin=admin,
defaults={
'api_key': api_key,
'balance': 0.0000,
'is_payment_configured': True
}
)
if not created and (not scanner_settings.api_key or scanner_settings.api_key != api_key):
scanner_settings.api_key = api_key
scanner_settings.is_payment_configured = True
scanner_settings.save()
logging.writeToFile(f"[AI Scanner] Updated VPS API key in database")
except Exception as save_error:
logging.writeToFile(f"[AI Scanner] Error saving VPS API key: {str(save_error)}")
except Exception as e:
pass

View File

@ -40,4 +40,15 @@ urlpatterns = [
re_path(r'^ai-scanner/status-webhook$', views.aiScannerStatusWebhook, name='aiScannerStatusWebhookAPI'),
re_path(r'^ai-scanner/callback/status-webhook$', views.aiScannerStatusWebhook, name='aiScannerStatusWebhookCallbackAPI'), # Alternative URL for worker compatibility
re_path(r'^ai-scanner/scan/(?P<scan_id>[^/]+)/live-progress$', views.aiScannerLiveProgress, name='aiScannerLiveProgressAPI'),
# File operation endpoints for AI Scanner
re_path(r'^scanner/backup-file$', views.scannerBackupFile, name='scannerBackupFileAPI'),
re_path(r'^scanner/get-file$', views.scannerGetFile, name='scannerGetFileAPI'),
re_path(r'^scanner/replace-file$', views.scannerReplaceFile, name='scannerReplaceFileAPI'),
re_path(r'^scanner/rename-file$', views.scannerRenameFile, name='scannerRenameFileAPI'),
re_path(r'^scanner/delete-file$', views.scannerDeleteFile, name='scannerDeleteFileAPI'),
# Debug endpoints for testing API authentication (remove in production)
re_path(r'^ai-scanner/test-auth$', views.testAuthDebug, name='testAuthDebugAPI'),
re_path(r'^ai-scanner/list-api-keys$', views.listApiKeysDebug, name='listApiKeysDebugAPI'),
]

View File

@ -915,3 +915,85 @@ def aiScannerLiveProgress(request, scan_id):
logging.writeToFile(f'[API] AI Scanner live progress error: {str(e)}')
data_ret = {'error': 'Live progress service unavailable'}
return HttpResponse(json.dumps(data_ret), status=500)
# AI Scanner File Operation endpoints
@csrf_exempt
def scannerBackupFile(request):
"""Scanner backup file endpoint"""
try:
from aiScanner.api import scanner_backup_file
return scanner_backup_file(request)
except Exception as e:
logging.writeToFile(f'[API] Scanner backup file error: {str(e)}')
data_ret = {'error': 'Backup file service unavailable'}
return HttpResponse(json.dumps(data_ret), status=500)
@csrf_exempt
def scannerGetFile(request):
"""Scanner get file endpoint"""
try:
from aiScanner.api import scanner_get_file
return scanner_get_file(request)
except Exception as e:
logging.writeToFile(f'[API] Scanner get file error: {str(e)}')
data_ret = {'error': 'Get file service unavailable'}
return HttpResponse(json.dumps(data_ret), status=500)
@csrf_exempt
def scannerReplaceFile(request):
"""Scanner replace file endpoint"""
try:
from aiScanner.api import scanner_replace_file
return scanner_replace_file(request)
except Exception as e:
logging.writeToFile(f'[API] Scanner replace file error: {str(e)}')
data_ret = {'error': 'Replace file service unavailable'}
return HttpResponse(json.dumps(data_ret), status=500)
@csrf_exempt
def scannerRenameFile(request):
"""Scanner rename file endpoint"""
try:
from aiScanner.api import scanner_rename_file
return scanner_rename_file(request)
except Exception as e:
logging.writeToFile(f'[API] Scanner rename file error: {str(e)}')
data_ret = {'error': 'Rename file service unavailable'}
return HttpResponse(json.dumps(data_ret), status=500)
@csrf_exempt
def scannerDeleteFile(request):
"""Scanner delete file endpoint"""
try:
from aiScanner.api import scanner_delete_file
return scanner_delete_file(request)
except Exception as e:
logging.writeToFile(f'[API] Scanner delete file error: {str(e)}')
data_ret = {'error': 'Delete file service unavailable'}
return HttpResponse(json.dumps(data_ret), status=500)
# Debug endpoints for testing API authentication (remove in production)
def testAuthDebug(request):
"""Test endpoint to debug API authentication"""
try:
from aiScanner.test_api_endpoint import test_auth
return test_auth(request)
except Exception as e:
logging.writeToFile(f'[API] Test auth debug error: {str(e)}')
return HttpResponse(json.dumps({'error': str(e)}), status=500)
def listApiKeysDebug(request):
"""Debug endpoint to list API keys in system"""
try:
from aiScanner.test_api_endpoint import list_api_keys
return list_api_keys(request)
except Exception as e:
logging.writeToFile(f'[API] List API keys debug error: {str(e)}')
return HttpResponse(json.dumps({'error': str(e)}), status=500)

View File

@ -2119,8 +2119,53 @@ class BackupManager:
websitesName = ACLManager.findAllSites(currentACL, userID)
proc = httpProc(request, 'backup/OneClickBackupSchedule.html', {'destination': NormalBackupDests.objects.get(name=ocb.sftpUser).name, 'websites': websitesName},
'scheduleBackups')
# Fetch storage stats and backup info from platform API
storage_info = {
'total_storage': 'N/A',
'used_storage': 'N/A',
'available_storage': 'N/A',
'usage_percentage': 0,
'last_backup_run': 'Never',
'last_backup_status': 'N/A',
'total_backups': 0,
'failed_backups': 0,
'error_logs': []
}
try:
import requests
url = 'https://platform.cyberpersons.com/Billing/GetBackupStats'
payload = {
'sub': ocb.subscription,
'sftpUser': ocb.sftpUser,
'serverIP': ACLManager.fetchIP()
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30)
if response.status_code == 200:
api_data = response.json()
if api_data.get('status') == 1:
storage_info = api_data.get('data', storage_info)
logging.CyberCPLogFileWriter.writeToFile(f'Successfully fetched backup stats for {ocb.sftpUser} [ManageOCBackups]')
else:
logging.CyberCPLogFileWriter.writeToFile(f'Platform API returned error: {api_data.get("error_message")} [ManageOCBackups]')
else:
logging.CyberCPLogFileWriter.writeToFile(f'Platform API returned HTTP {response.status_code} [ManageOCBackups]')
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to fetch backup stats: {str(e)} [ManageOCBackups]')
context = {
'destination': NormalBackupDests.objects.get(name=ocb.sftpUser).name,
'websites': websitesName,
'storage_info': storage_info,
'ocb_subscription': ocb.subscription,
'ocb_plan_name': ocb.planName,
'ocb_sftp_user': ocb.sftpUser
}
proc = httpProc(request, 'backup/OneClickBackupSchedule.html', context, 'scheduleBackups')
return proc.render()
def RestoreOCBackups(self, request=None, userID=None, data=None):
@ -2243,6 +2288,7 @@ class BackupManager:
return proc.render()
def fetchOCSites(self, request=None, userID=None, data=None):
ssh = None
try:
userID = request.session['userID']
currentACL = ACLManager.loadedACL(userID)
@ -2253,47 +2299,143 @@ class BackupManager:
admin = Administrator.objects.get(pk=userID)
from IncBackups.models import OneClickBackups
ocb = OneClickBackups.objects.get(pk = id, owner=admin)
# Load the private key
try:
ocb = OneClickBackups.objects.get(pk=id, owner=admin)
except OneClickBackups.DoesNotExist:
logging.CyberCPLogFileWriter.writeToFile(f"OneClickBackup with id {id} not found for user {userID} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': 'Backup plan not found or you do not have permission to access it.'}
return HttpResponse(json.dumps(data_ret))
nbd = NormalBackupDests.objects.get(name=ocb.sftpUser)
ip = json.loads(nbd.config)['ip']
# Load backup destination configuration
try:
nbd = NormalBackupDests.objects.get(name=ocb.sftpUser)
ip = json.loads(nbd.config)['ip']
except NormalBackupDests.DoesNotExist:
logging.CyberCPLogFileWriter.writeToFile(f"Backup destination {ocb.sftpUser} not found [fetchOCSites]")
data_ret = {'status': 0, 'error_message': 'Backup destination not configured. Please deploy your backup account first.'}
return HttpResponse(json.dumps(data_ret))
except (KeyError, json.JSONDecodeError) as e:
logging.CyberCPLogFileWriter.writeToFile(f"Invalid backup destination config for {ocb.sftpUser}: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': 'Backup destination configuration is invalid. Please reconfigure your backup account.'}
return HttpResponse(json.dumps(data_ret))
# Connect to the remote server using the private key
# Read and validate SSH private key
private_key_path = '/root/.ssh/cyberpanel'
# Check if SSH key exists
check_exists = ProcessUtilities.outputExecutioner(f'test -f {private_key_path} && echo "EXISTS" || echo "NOT_EXISTS"').strip()
if check_exists == "NOT_EXISTS":
logging.CyberCPLogFileWriter.writeToFile(f"SSH key not found at {private_key_path} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': f'SSH key not found at {private_key_path}. Please ensure One-click Backup is properly configured.'}
return HttpResponse(json.dumps(data_ret))
# Read the key content
key_content = ProcessUtilities.outputExecutioner(f'sudo cat {private_key_path}').rstrip('\n')
if not key_content or key_content.startswith('cat:'):
logging.CyberCPLogFileWriter.writeToFile(f"Failed to read SSH key at {private_key_path} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': f'Could not read SSH key at {private_key_path}. Please check permissions.'}
return HttpResponse(json.dumps(data_ret))
# Load the private key with support for multiple key types
key_file = StringIO(key_content)
key = None
try:
key = paramiko.RSAKey.from_private_key(key_file)
except:
try:
key_file.seek(0)
key = paramiko.Ed25519Key.from_private_key(key_file)
except:
try:
key_file.seek(0)
key = paramiko.ECDSAKey.from_private_key(key_file)
except:
try:
key_file.seek(0)
key = paramiko.DSSKey.from_private_key(key_file)
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Failed to load SSH key: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': 'Failed to load SSH key. The key format may be unsupported or corrupted.'}
return HttpResponse(json.dumps(data_ret))
# Connect to the remote server
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Read the private key content
private_key_path = '/root/.ssh/cyberpanel'
key_content = ProcessUtilities.outputExecutioner(f'cat {private_key_path}').rstrip('\n')
# Load the private key from the content
key_file = StringIO(key_content)
key = paramiko.RSAKey.from_private_key(key_file)
# Connect to the server using the private key
ssh.connect(ip, username=ocb.sftpUser, pkey=key)
# Command to list directories under the specified path
command = f"ls -d cpbackups/{folder}/*"
try:
ssh.connect(ip, username=ocb.sftpUser, pkey=key, timeout=30)
except paramiko.AuthenticationException as e:
logging.CyberCPLogFileWriter.writeToFile(f"SSH Authentication failed for {ocb.sftpUser}@{ip}: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': 'SSH Authentication failed. Your backup account credentials may have changed. Please try redeploying your backup account.'}
return HttpResponse(json.dumps(data_ret))
except paramiko.SSHException as e:
logging.CyberCPLogFileWriter.writeToFile(f"SSH Connection failed to {ip}: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': f'Failed to connect to backup server: {str(e)}. Please check your network connection and try again.'}
return HttpResponse(json.dumps(data_ret))
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Unexpected SSH error connecting to {ip}: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': f'Connection to backup server failed: {str(e)}'}
return HttpResponse(json.dumps(data_ret))
# Execute the command
stdin, stdout, stderr = ssh.exec_command(command)
# Execute command to list backup files
command = f"ls -d cpbackups/{folder}/* 2>/dev/null || echo 'NO_FILES_FOUND'"
# Read the results
directories = stdout.read().decode().splitlines()
try:
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode().strip()
error_output = stderr.read().decode().strip()
finalDirs = []
if output == 'NO_FILES_FOUND' or not output:
# No backups found in this folder
data_ret = {'status': 1, 'finalDirs': []}
return HttpResponse(json.dumps(data_ret))
# Print directories
for directory in directories:
finalDirs.append(directory.split('/')[2])
directories = output.splitlines()
finalDirs = []
data_ret = {'status': 1, 'finalDirs': finalDirs}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'status': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
# Extract backup names from paths
for directory in directories:
if directory and '/' in directory:
try:
# Extract the backup filename from path: cpbackups/{folder}/{backup_name}
parts = directory.split('/')
if len(parts) >= 3:
finalDirs.append(parts[2])
except (IndexError, ValueError) as e:
logging.CyberCPLogFileWriter.writeToFile(f"Failed to parse directory path '{directory}': {str(e)} [fetchOCSites]")
continue
data_ret = {'status': 1, 'finalDirs': finalDirs}
return HttpResponse(json.dumps(data_ret))
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Failed to execute command on remote server: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': f'Failed to list backups: {str(e)}'}
return HttpResponse(json.dumps(data_ret))
except json.JSONDecodeError as e:
logging.CyberCPLogFileWriter.writeToFile(f"Invalid JSON in request: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': 'Invalid request format.'}
return HttpResponse(json.dumps(data_ret))
except KeyError as e:
logging.CyberCPLogFileWriter.writeToFile(f"Missing required field in request: {str(e)} [fetchOCSites]")
data_ret = {'status': 0, 'error_message': f'Missing required field: {str(e)}'}
return HttpResponse(json.dumps(data_ret))
except Exception as msg:
logging.CyberCPLogFileWriter.writeToFile(f"Unexpected error in fetchOCSites: {str(msg)}")
data_ret = {'status': 0, 'error_message': f'An unexpected error occurred: {str(msg)}'}
return HttpResponse(json.dumps(data_ret))
finally:
# Always close SSH connection
if ssh:
try:
ssh.close()
except:
pass
def StartOCRestore(self, request=None, userID=None, data=None):
try:
@ -2335,122 +2477,159 @@ class BackupManager:
return HttpResponse(json_data)
def DeployAccount(self, request=None, userID=None, data=None):
user = Administrator.objects.get(pk=userID)
"""Deploy a One-Click Backup account by creating SFTP credentials on remote server"""
try:
user = Administrator.objects.get(pk=userID)
userID = request.session['userID']
currentACL = ACLManager.loadedACL(userID)
userID = request.session['userID']
currentACL = ACLManager.loadedACL(userID)
import json
# Parse request data
try:
data = json.loads(request.body)
backup_id = data['id']
except (json.JSONDecodeError, KeyError) as e:
logging.CyberCPLogFileWriter.writeToFile(f"Invalid request data in DeployAccount: {str(e)}")
return HttpResponse(json.dumps({
'status': 0,
'error_message': 'Invalid request format. Missing required field: id'
}))
data = json.loads(request.body)
id = data['id']
# Get backup plan
from IncBackups.models import OneClickBackups
try:
ocb = OneClickBackups.objects.get(pk=backup_id, owner=user)
except OneClickBackups.DoesNotExist:
logging.CyberCPLogFileWriter.writeToFile(f"OneClickBackup {backup_id} not found for user {userID} [DeployAccount]")
return HttpResponse(json.dumps({
'status': 0,
'error_message': 'Backup plan not found or you do not have permission to access it.'
}))
from IncBackups.models import OneClickBackups
ocb = OneClickBackups.objects.get(pk=id, owner=user)
# Check if already deployed
if ocb.state == 1:
logging.CyberCPLogFileWriter.writeToFile(f"Backup plan {backup_id} already deployed [DeployAccount]")
return HttpResponse(json.dumps({
'status': 1,
'error_message': 'This backup account is already deployed.'
}))
data = {}
# Read SSH public key
try:
ssh_pub_key = ProcessUtilities.outputExecutioner('cat /root/.ssh/cyberpanel.pub').strip()
if not ssh_pub_key or ssh_pub_key.startswith('cat:'):
raise Exception("Failed to read SSH public key")
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Failed to read SSH public key: {str(e)} [DeployAccount]")
return HttpResponse(json.dumps({
'status': 0,
'error_message': 'SSH public key not found. Please ensure One-Click Backup is properly configured.'
}))
####
# Prepare API request
url = 'https://platform.cyberpersons.com/Billing/CreateSFTPAccount'
payload = {
'sub': ocb.subscription,
'key': ssh_pub_key,
'sftpUser': ocb.sftpUser,
'serverIP': ACLManager.fetchIP(),
'planName': ocb.planName
}
headers = {'Content-Type': 'application/json'}
import requests
import json
# Make API request
try:
response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30)
except requests.exceptions.RequestException as e:
logging.CyberCPLogFileWriter.writeToFile(f"API request failed: {str(e)} [DeployAccount]")
return HttpResponse(json.dumps({
'status': 0,
'error_message': f'Failed to connect to backup platform: {str(e)}'
}))
# Define the URL of the endpoint
url = 'http://platform.cyberpersons.com/Billing/CreateSFTPAccount' # Replace with your actual endpoint URL
# Handle non-200 responses
if response.status_code != 200:
logging.CyberCPLogFileWriter.writeToFile(f"API returned status {response.status_code}: {response.text} [DeployAccount]")
return HttpResponse(json.dumps({
'status': 0,
'error_message': f'Backup platform returned error (HTTP {response.status_code}). Please try again later.'
}))
# Define the payload to send in the POST request
payload = {
'sub': ocb.subscription,
'key': ProcessUtilities.outputExecutioner(f'cat /root/.ssh/cyberpanel.pub'),
# Replace with the actual SSH public key
'sftpUser': ocb.sftpUser,
'serverIP': ACLManager.fetchIP(), # Replace with the actual server IP
'planName': ocb.planName
}
# Parse API response
try:
response_data = response.json()
except json.JSONDecodeError:
logging.CyberCPLogFileWriter.writeToFile(f"Invalid JSON response from API: {response.text} [DeployAccount]")
return HttpResponse(json.dumps({
'status': 0,
'error_message': 'Received invalid response from backup platform.'
}))
# Convert the payload to JSON format
headers = {'Content-Type': 'application/json'}
dataRet = json.dumps(payload)
# Check if deployment was successful or already deployed
api_status = response_data.get('status')
api_error = response_data.get('error_message', '')
# Make the POST request
response = requests.post(url, headers=headers, data=dataRet)
# Handle the response
# Handle the response
if response.status_code == 200:
response_data = response.json()
if response_data.get('status') == 1:
if api_status == 1 or api_error == "Already deployed.":
# Both cases are success - account exists and is ready
deployment_status = "created" if api_status == 1 else "already deployed"
logging.CyberCPLogFileWriter.writeToFile(f"SFTP account {deployment_status} for {ocb.sftpUser} [DeployAccount]")
# Update backup plan state
ocb.state = 1
ocb.save()
print("SFTP account created successfully.")
finalDic = {}
finalDic['IPAddress'] = response_data.get('ipAddress')
finalDic['password'] = 'NOT-NEEDED'
finalDic['backupSSHPort'] = '22'
finalDic['userName'] = ocb.sftpUser
finalDic['type'] = 'SFTP'
finalDic['path'] = 'cpbackups'
finalDic['name'] = ocb.sftpUser
wm = BackupManager()
response_inner = wm.submitDestinationCreation(userID, finalDic)
response_data_inner = json.loads(response_inner.content.decode('utf-8'))
# Extract the value of 'status'
if response_data_inner.get('status') == 0:
data_ret = {'status': 1, 'error_message': response_data_inner.get('error_message')}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 1,}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
if response_data.get('error_message') == "Already deployed.":
ocb.state = 1
ocb.save()
print("SFTP account created successfully.")
finalDic = {}
finalDic['IPAddress'] = response_data.get('ipAddress')
finalDic['password'] = 'NOT-NEEDED'
finalDic['backupSSHPort'] = '22'
finalDic['userName'] = ocb.sftpUser
finalDic['type'] = 'SFTP'
finalDic['path'] = 'cpbackups'
finalDic['name'] = ocb.sftpUser
# Create local backup destination
finalDic = {
'IPAddress': response_data.get('ipAddress'),
'password': 'NOT-NEEDED',
'backupSSHPort': '22',
'userName': ocb.sftpUser,
'type': 'SFTP',
'path': 'cpbackups',
'name': ocb.sftpUser
}
try:
wm = BackupManager()
response_inner = wm.submitDestinationCreation(userID, finalDic)
response_data_inner = json.loads(response_inner.content.decode('utf-8'))
# Extract the value of 'status'
if response_data_inner.get('status') == 0:
data_ret = {'status': 1, 'error_message': response_data_inner.get('error_message')}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 1, }
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
# Destination creation failed, but account is deployed
logging.CyberCPLogFileWriter.writeToFile(
f"Destination creation failed: {response_data_inner.get('error_message')} [DeployAccount]"
)
return HttpResponse(json.dumps({
'status': 0,
'error_message': f"Account deployed but failed to create local destination: {response_data_inner.get('error_message')}"
}))
data_ret = {'status': 0, 'error_message': response_data.get('error_message')}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data['message'] = f"[1991] Failed to create sftp account {response.text}"
data_ret = {'status': 0, 'error_message': response.text}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
# Full success
return HttpResponse(json.dumps({
'status': 1,
'message': f'Backup account {deployment_status} successfully.'
}))
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Failed to create destination: {str(e)} [DeployAccount]")
return HttpResponse(json.dumps({
'status': 0,
'error_message': f'Account deployed but failed to create local destination: {str(e)}'
}))
else:
# API returned an error
logging.CyberCPLogFileWriter.writeToFile(f"API returned error: {api_error} [DeployAccount]")
return HttpResponse(json.dumps({
'status': 0,
'error_message': api_error or 'Unknown error occurred during deployment.'
}))
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Unexpected error in DeployAccount: {str(e)}")
return HttpResponse(json.dumps({
'status': 0,
'error_message': f'An unexpected error occurred: {str(e)}'
}))
def ReconfigureSubscription(self, request=None, userID=None, data=None):
try:

View File

@ -380,8 +380,8 @@
</h1>
<p class="page-subtitle">{% trans "Schedule automated backups to protect your data on localhost or remote server" %}</p>
<div class="header-actions">
<a href="https://cyberpanel.net/KnowledgeBase/home/schedule-backups-local-or-sftp/"
target="_blank"
<a href="https://cyberpanel.net/KnowledgeBase/home/schedule-backups-local-or-sftp/"
target="_blank"
class="btn-secondary">
<i class="fas fa-book"></i>
{% trans "Documentation" %}
@ -389,6 +389,114 @@
</div>
</div>
<!-- Storage and Backup Stats Section -->
<div class="main-card" style="margin-bottom: 2rem;">
<div class="card-header">
<h2 class="card-title">
<i class="fas fa-chart-pie"></i>
{% trans "Backup Account Overview" %}
</h2>
</div>
<div class="card-body">
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 1.5rem;">
<!-- Storage Stats -->
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 1.5rem; border-radius: 12px; color: white;">
<div style="display: flex; align-items: center; gap: 1rem; margin-bottom: 0.5rem;">
<i class="fas fa-hdd" style="font-size: 2rem; opacity: 0.9;"></i>
<div>
<div style="font-size: 0.875rem; opacity: 0.9;">{% trans "Storage Used" %}</div>
<div style="font-size: 1.5rem; font-weight: 700;">{{ storage_info.used_storage }}</div>
</div>
</div>
<div style="font-size: 0.875rem; opacity: 0.9;">{% trans "of" %} {{ storage_info.total_storage }}</div>
<div style="background: rgba(255,255,255,0.2); height: 8px; border-radius: 4px; margin-top: 0.75rem; overflow: hidden;">
<div style="background: rgba(255,255,255,0.9); height: 100%; width: {{ storage_info.usage_percentage }}%; transition: width 0.3s ease;"></div>
</div>
</div>
<!-- Last Backup -->
<div style="background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); padding: 1.5rem; border-radius: 12px; color: white;">
<div style="display: flex; align-items: center; gap: 1rem;">
<i class="fas fa-clock" style="font-size: 2rem; opacity: 0.9;"></i>
<div>
<div style="font-size: 0.875rem; opacity: 0.9;">{% trans "Last Backup Run" %}</div>
<div style="font-size: 1.125rem; font-weight: 600;">{{ storage_info.last_backup_run }}</div>
<div style="font-size: 0.875rem; opacity: 0.9; margin-top: 0.25rem;">
{% if storage_info.last_backup_status == 'success' %}
<i class="fas fa-check-circle"></i> {% trans "Success" %}
{% elif storage_info.last_backup_status == 'failed' %}
<i class="fas fa-exclamation-circle"></i> {% trans "Failed" %}
{% else %}
{{ storage_info.last_backup_status }}
{% endif %}
</div>
</div>
</div>
</div>
<!-- Total Backups -->
<div style="background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%); padding: 1.5rem; border-radius: 12px; color: white;">
<div style="display: flex; align-items: center; gap: 1rem;">
<i class="fas fa-database" style="font-size: 2rem; opacity: 0.9;"></i>
<div>
<div style="font-size: 0.875rem; opacity: 0.9;">{% trans "Total Backups" %}</div>
<div style="font-size: 1.5rem; font-weight: 700;">{{ storage_info.total_backups }}</div>
{% if storage_info.failed_backups > 0 %}
<div style="font-size: 0.875rem; opacity: 0.9; margin-top: 0.25rem;">
<i class="fas fa-exclamation-triangle"></i> {{ storage_info.failed_backups }} {% trans "failed" %}
</div>
{% endif %}
</div>
</div>
</div>
<!-- Account Info -->
<div style="background: linear-gradient(135deg, #43e97b 0%, #38f9d7 100%); padding: 1.5rem; border-radius: 12px; color: white;">
<div style="display: flex; align-items: center; gap: 1rem;">
<i class="fas fa-user-circle" style="font-size: 2rem; opacity: 0.9;"></i>
<div>
<div style="font-size: 0.875rem; opacity: 0.9;">{% trans "Backup Account" %}</div>
<div style="font-size: 1.125rem; font-weight: 600;">{{ ocb_sftp_user }}</div>
<div style="font-size: 0.875rem; opacity: 0.9; margin-top: 0.25rem;">{{ ocb_plan_name }}</div>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- Error Logs Section -->
{% if storage_info.error_logs %}
<div class="main-card" style="margin-bottom: 2rem;">
<div class="card-header" style="background: linear-gradient(135deg, #fee2e2 0%, #fecaca 100%);">
<h2 class="card-title" style="color: #991b1b;">
<i class="fas fa-exclamation-triangle"></i>
{% trans "Recent Backup Errors" %}
</h2>
</div>
<div class="card-body" style="padding: 0;">
<table class="data-table">
<thead>
<tr>
<th style="width: 180px;">{% trans "Date/Time" %}</th>
<th style="width: 150px;">{% trans "Website" %}</th>
<th>{% trans "Error Message" %}</th>
</tr>
</thead>
<tbody>
{% for log in storage_info.error_logs %}
<tr>
<td>{{ log.timestamp }}</td>
<td><strong>{{ log.website }}</strong></td>
<td style="color: var(--danger-text, #991b1b);">{{ log.error_message }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
{% endif %}
<div ng-controller="scheduleBackup">
<!-- Create New Schedule Card -->
<div class="main-card">

View File

@ -1039,8 +1039,9 @@ class FileManager:
'error_message': "Symlink attack."})
return HttpResponse(final_json)
# Set home directory ownership
command = 'chown %s:%s /home/%s' % (website.externalApp, website.externalApp, domainName)
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
### Sym link checks
@ -1053,27 +1054,21 @@ class FileManager:
'error_message': "Symlink attack."})
return HttpResponse(final_json)
command = 'chown -R -P %s:%s /home/%s/public_html/*' % (externalApp, externalApp, domainName)
ProcessUtilities.popenExecutioner(command)
command = 'chown -R -P %s:%s /home/%s/public_html/.[^.]*' % (externalApp, externalApp, domainName)
ProcessUtilities.popenExecutioner(command)
# command = "chown root:%s /home/" % (groupName) + domainName + "/logs"
# ProcessUtilities.popenExecutioner(command)
# Set file permissions first (before ownership to avoid conflicts)
command = "find %s -type d -exec chmod 0755 {} \;" % ("/home/" + domainName + "/public_html")
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
command = "find %s -type f -exec chmod 0644 {} \;" % ("/home/" + domainName + "/public_html")
ProcessUtilities.popenExecutioner(command)
command = 'chown %s:%s /home/%s/public_html' % (externalApp, groupName, domainName)
ProcessUtilities.executioner(command)
command = 'chmod 750 /home/%s/public_html' % (domainName)
# Set ownership for all files inside public_html to user:user
command = 'chown -R -P %s:%s /home/%s/public_html/*' % (externalApp, externalApp, domainName)
ProcessUtilities.executioner(command)
command = 'chown -R -P %s:%s /home/%s/public_html/.[^.]*' % (externalApp, externalApp, domainName)
ProcessUtilities.executioner(command)
# Process child domains first
for childs in website.childdomains_set.all():
command = 'ls -la %s' % childs.path
result = ProcessUtilities.outputExecutioner(command)
@ -1084,21 +1079,30 @@ class FileManager:
'error_message': "Symlink attack."})
return HttpResponse(final_json)
# Set file permissions first
command = "find %s -type d -exec chmod 0755 {} \;" % (childs.path)
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
command = "find %s -type f -exec chmod 0644 {} \;" % (childs.path)
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
# Set ownership for all files inside child domain to user:user
command = 'chown -R -P %s:%s %s/*' % (externalApp, externalApp, childs.path)
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
command = 'chown -R -P %s:%s %s/.[^.]*' % (externalApp, externalApp, childs.path)
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
# Set child domain directory itself to 755 with user:nogroup
command = 'chmod 755 %s' % (childs.path)
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
command = 'chown %s:%s %s' % (externalApp, groupName, childs.path)
ProcessUtilities.popenExecutioner(command)
ProcessUtilities.executioner(command)
# Set public_html directory itself to user:nogroup with 750 permissions (done at the end)
command = 'chown %s:%s /home/%s/public_html' % (externalApp, groupName, domainName)
ProcessUtilities.executioner(command)
command = 'chmod 750 /home/%s/public_html' % (domainName)
ProcessUtilities.executioner(command)

View File

@ -8,6 +8,9 @@ import os
import sys
import secrets
import string
import socket
import urllib.request
import re
from pathlib import Path
def generate_secure_password(length=24):
@ -27,16 +30,53 @@ def generate_secure_password(length=24):
def generate_secret_key(length=64):
"""
Generate a cryptographically secure Django secret key
Args:
length: Length of the secret key to generate (default 64)
Returns:
str: Random secret key
"""
chars = string.ascii_letters + string.digits + '!@#$%^&*(-_=+)'
return ''.join(secrets.choice(chars) for _ in range(length))
def get_public_ip():
"""Get the public IP address of the server using multiple methods"""
methods = [
'https://ipv4.icanhazip.com',
'https://api.ipify.org',
'https://checkip.amazonaws.com',
'https://ipecho.net/plain'
]
for url in methods:
try:
with urllib.request.urlopen(url, timeout=10) as response:
ip = response.read().decode('utf-8').strip()
# Validate IP format
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
print(f"✓ Detected public IP: {ip}")
return ip
except Exception as e:
print(f"Failed to get IP from {url}: {e}")
continue
print("⚠️ Could not detect public IP address")
return None
def get_local_ip():
"""Get the local IP address of the server"""
try:
# Connect to a remote address to determine the local IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
print(f"✓ Detected local IP: {local_ip}")
return local_ip
except Exception as e:
print(f"Failed to detect local IP: {e}")
return None
def create_env_file(cyberpanel_path, mysql_root_password=None, cyberpanel_db_password=None):
"""
Create .env file with generated secure credentials
@ -56,15 +96,49 @@ def create_env_file(cyberpanel_path, mysql_root_password=None, cyberpanel_db_pas
secret_key = generate_secret_key(64)
# Get hostname for ALLOWED_HOSTS
import socket
# Auto-detect IP addresses for ALLOWED_HOSTS
print("🔍 Auto-detecting server IP addresses...")
# Get hostname and local hostname resolution
try:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
hostname_ip = socket.gethostbyname(hostname)
except:
hostname = 'localhost'
local_ip = '127.0.0.1'
hostname_ip = '127.0.0.1'
# Get actual local IP address
local_ip = get_local_ip()
# Get public IP address
public_ip = get_public_ip()
# Build ALLOWED_HOSTS list with all detected IPs
allowed_hosts = ['localhost', '127.0.0.1']
# Add hostname if different from localhost
if hostname and hostname != 'localhost':
allowed_hosts.append(hostname)
# Add hostname IP if different from localhost
if hostname_ip and hostname_ip not in allowed_hosts:
allowed_hosts.append(hostname_ip)
# Add local IP if detected and different
if local_ip and local_ip not in allowed_hosts:
allowed_hosts.append(local_ip)
# Add public IP if detected and different
if public_ip and public_ip not in allowed_hosts:
allowed_hosts.append(public_ip)
# Add wildcard for maximum compatibility (allows any host)
# This ensures CyberPanel works regardless of how the server is accessed
allowed_hosts.append('*')
allowed_hosts_str = ','.join(allowed_hosts)
print(f"✓ ALLOWED_HOSTS configured: {allowed_hosts_str}")
# Create .env content
env_content = f"""# CyberPanel Environment Configuration
# Generated automatically during installation - DO NOT EDIT MANUALLY
@ -73,7 +147,7 @@ def create_env_file(cyberpanel_path, mysql_root_password=None, cyberpanel_db_pas
# Django Configuration
SECRET_KEY={secret_key}
DEBUG=False
ALLOWED_HOSTS=localhost,127.0.0.1,{hostname},{local_ip}
ALLOWED_HOSTS={allowed_hosts_str}
# Database Configuration - CyberPanel Database
DB_NAME=cyberpanel

View File

@ -619,7 +619,8 @@ password="%s"
logging.InstallLog.writeToFile("Generating secure environment configuration!")
# Generate secure environment file instead of hardcoding passwords
self.generate_secure_env_file(mysqlPassword, password)
# Note: password = MySQL root password, mysqlPassword = CyberPanel DB password
self.generate_secure_env_file(password, mysqlPassword)
logging.InstallLog.writeToFile("Environment configuration generated successfully!")

View File

@ -1058,9 +1058,11 @@ def Main(cwd, mysql, distro, ent, serial=None, port="8090", ftp=None, dns=None,
except:
pass
if distro == centos:
# For RHEL-based systems (CentOS, AlmaLinux, Rocky, etc.), generate a separate password
if distro in [centos, cent8, openeuler]:
InstallCyberPanel.mysqlPassword = install_utils.generate_pass()
else:
# For Ubuntu/Debian, use the same password as root
InstallCyberPanel.mysqlPassword = InstallCyberPanel.mysql_Root_password
installer = InstallCyberPanel("/usr/local/lsws/", cwd, distro, ent, serial, port, ftp, dns, publicip, remotemysql,

112
install/setup_env.py Normal file
View File

@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""
Environment setup script for CyberPanel
Automatically detects server IP and configures .env file
"""
import os
import socket
import urllib.request
import re
import shutil
from pathlib import Path
def get_public_ip():
"""Get the public IP address of the server using multiple methods"""
methods = [
'https://ipv4.icanhazip.com',
'https://api.ipify.org',
'https://checkip.amazonaws.com',
'https://ipecho.net/plain'
]
for url in methods:
try:
with urllib.request.urlopen(url, timeout=10) as response:
ip = response.read().decode('utf-8').strip()
# Validate IP format
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
return ip
except Exception:
continue
return None
def get_local_ip():
"""Get the local IP address of the server"""
try:
# Connect to a remote address to determine the local IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return None
def setup_env_file(cyberpanel_root="/usr/local/CyberCP"):
"""Set up the .env file with auto-detected IP addresses"""
env_template_path = os.path.join(cyberpanel_root, ".env.template")
env_path = os.path.join(cyberpanel_root, ".env")
# Get IPs
public_ip = get_public_ip()
local_ip = get_local_ip()
# Build ALLOWED_HOSTS list
allowed_hosts = ['localhost', '127.0.0.1']
if local_ip and local_ip not in allowed_hosts:
allowed_hosts.append(local_ip)
if public_ip and public_ip not in allowed_hosts:
allowed_hosts.append(public_ip)
# For maximum compatibility, also include wildcard
# This ensures the server works regardless of how it's accessed
allowed_hosts.append('*')
allowed_hosts_str = ','.join(allowed_hosts)
print(f"Auto-detected public IP: {public_ip}")
print(f"Auto-detected local IP: {local_ip}")
print(f"Setting ALLOWED_HOSTS to: {allowed_hosts_str}")
# If .env doesn't exist, copy from template
if not os.path.exists(env_path):
if os.path.exists(env_template_path):
shutil.copy2(env_template_path, env_path)
print(f"Created .env file from template")
else:
print(f"Warning: Template file not found at {env_template_path}")
return False
# Update ALLOWED_HOSTS in .env file
try:
with open(env_path, 'r') as f:
content = f.read()
# Update ALLOWED_HOSTS line
content = re.sub(
r'^ALLOWED_HOSTS=.*$',
f'ALLOWED_HOSTS={allowed_hosts_str}',
content,
flags=re.MULTILINE
)
with open(env_path, 'w') as f:
f.write(content)
print(f"Updated ALLOWED_HOSTS in {env_path}")
return True
except Exception as e:
print(f"Error updating .env file: {e}")
return False
if __name__ == "__main__":
import sys
# Allow custom path as argument
cyberpanel_root = sys.argv[1] if len(sys.argv) > 1 else "/usr/local/CyberCP"
success = setup_env_file(cyberpanel_root)
sys.exit(0 if success else 1)

View File

@ -897,78 +897,164 @@ Automatic backup failed for %s on %s.
for site in websites:
from datetime import datetime, timedelta
import hashlib
Yesterday = (datetime.now() - timedelta(days=1)).strftime("%m.%d.%Y")
print(f'date of yesterday {Yesterday}')
# Command to list directories under the specified path
command = f"ls -d {finalPath}/*"
# Try SSH command first
directories = []
try:
# Execute the command
stdin, stdout, stderr = ssh.exec_command(command, timeout=10)
# Enhanced backup verification with multiple methods
backup_found = False
backup_file_path = None
file_size = 0
# Read the results
directories = stdout.read().decode().splitlines()
except:
# If SSH command fails, try using SFTP
logging.writeToFile(f'SSH ls command failed for {destinationConfig["ip"]}, trying SFTP listdir')
try:
sftp = ssh.open_sftp()
# List files in the directory
files = sftp.listdir(finalPath)
# Format them similar to ls -d output
directories = [f"{finalPath}/{f}" for f in files]
sftp.close()
except BaseException as msg:
logging.writeToFile(f'Failed to list directory via SFTP: {str(msg)}')
directories = []
if actualDomain:
check_domain = site.domain
else:
check_domain = site.domain.domain
if os.path.exists(ProcessUtilities.debugPath):
logging.writeToFile(str(directories))
# Method 1 & 3: Use timestamp-based filename and filter to only today's backup directory
# Expected filename format: backup-{domain}-{timestamp}.tar.gz
# Where timestamp from line 515: currentTime = time.strftime("%m.%d.%Y_%H-%M-%S")
try:
# Method 3: Only search within today's backup directory (finalPath already contains today's timestamp)
if ssh_commands_supported:
# Use find command to search for backup files with domain name in today's directory
# -size +1k filters files larger than 1KB (Method 2: size validation)
command = f"find {finalPath} -name '*{check_domain}*.tar.gz' -type f -size +1k 2>/dev/null"
startCheck = 0
for directory in directories:
if directory.find(site.domain):
print(f'site in backup, no need to notify {site.domain}')
startCheck = 1
break
try:
stdin, stdout, stderr = ssh.exec_command(command, timeout=15)
matching_files = stdout.read().decode().strip().splitlines()
if matching_files:
# Found backup file(s), verify the first one
backup_file_path = matching_files[0]
# Method 2: Get and validate file size
try:
size_command = f"stat -c%s '{backup_file_path}' 2>/dev/null || stat -f%z '{backup_file_path}' 2>/dev/null"
stdin, stdout, stderr = ssh.exec_command(size_command, timeout=10)
file_size = int(stdout.read().decode().strip())
# Require at least 1KB for valid backup
if file_size >= 1024:
backup_found = True
logging.CyberCPLogFileWriter.writeToFile(
f'Backup verified for {check_domain}: {backup_file_path} ({file_size} bytes) [IncScheduler.startNormalBackups]'
)
# Method 5: Optional checksum verification for additional integrity check
# Only do checksum if we have the local backup file for comparison
# This is optional and adds extra verification
try:
# Calculate remote checksum
checksum_command = f"sha256sum '{backup_file_path}' 2>/dev/null | awk '{{print $1}}'"
stdin, stdout, stderr = ssh.exec_command(checksum_command, timeout=60)
remote_checksum = stdout.read().decode().strip()
if remote_checksum and len(remote_checksum) == 64: # Valid SHA256 length
logging.CyberCPLogFileWriter.writeToFile(
f'Backup checksum verified for {check_domain}: {remote_checksum[:16]}... [IncScheduler.startNormalBackups]'
)
except:
# Checksum verification is optional, don't fail if it doesn't work
pass
else:
logging.CyberCPLogFileWriter.writeToFile(
f'Backup file too small for {check_domain}: {backup_file_path} ({file_size} bytes, minimum 1KB required) [IncScheduler.startNormalBackups]'
)
except Exception as size_err:
# If we can't get size but file exists, still consider it found
backup_found = True
logging.CyberCPLogFileWriter.writeToFile(
f'Backup found for {check_domain}: {backup_file_path} (size check failed: {str(size_err)}) [IncScheduler.startNormalBackups]'
)
except Exception as find_err:
logging.CyberCPLogFileWriter.writeToFile(f'SSH find command failed: {str(find_err)}, falling back to SFTP [IncScheduler.startNormalBackups]')
# Fallback to SFTP if SSH commands not supported or failed
if not backup_found:
try:
sftp = ssh.open_sftp()
# List files in today's backup directory only (Method 3)
try:
files = sftp.listdir(finalPath)
except FileNotFoundError:
logging.CyberCPLogFileWriter.writeToFile(f'Backup directory not found: {finalPath} [IncScheduler.startNormalBackups]')
files = []
# Check each file for domain match and validate
for f in files:
# Method 1: Check if domain is in filename and it's a tar.gz
if check_domain in f and f.endswith('.tar.gz'):
file_path = f"{finalPath}/{f}"
try:
# Method 2: Validate file size
file_stat = sftp.stat(file_path)
file_size = file_stat.st_size
if file_size >= 1024: # At least 1KB
backup_found = True
backup_file_path = file_path
logging.CyberCPLogFileWriter.writeToFile(
f'Backup verified for {check_domain} via SFTP: {file_path} ({file_size} bytes) [IncScheduler.startNormalBackups]'
)
break
else:
logging.CyberCPLogFileWriter.writeToFile(
f'Backup file too small for {check_domain}: {file_path} ({file_size} bytes) [IncScheduler.startNormalBackups]'
)
except Exception as stat_err:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to stat file {file_path}: {str(stat_err)} [IncScheduler.startNormalBackups]')
sftp.close()
except Exception as sftp_err:
logging.CyberCPLogFileWriter.writeToFile(f'SFTP verification failed: {str(sftp_err)} [IncScheduler.startNormalBackups]')
# Only send notification if backup was NOT found (backup failed)
if not backup_found:
logging.CyberCPLogFileWriter.writeToFile(f'Backup NOT found for {check_domain}, sending failure notification [IncScheduler.startNormalBackups]')
if startCheck:
'send notification that backup failed'
import requests
# Define the URL of the endpoint
url = 'http://platform.cyberpersons.com/Billing/BackupFailedNotify' # Replace with your actual endpoint URL
url = 'https://platform.cyberpersons.com/Billing/BackupFailedNotify'
# Define the payload to send in the POST request
payload = {
'sub': ocb.subscription,
'subject': f'Failed to backup {site.domain} on {ACLManager.fetchIP()}.',
'message':f'Hi, \n\n Failed to create backup for {site.domain} on on {ACLManager.fetchIP()}. \n\n Please contact our support team at: http://platform.cyberpersons.com\n\nThank you.',
# Replace with the actual SSH public key
'subject': f'Backup Failed for {check_domain} on {ACLManager.fetchIP()}',
'message': f'Hi,\n\nFailed to create backup for {check_domain} on {ACLManager.fetchIP()}.\n\nBackup was scheduled but the backup file was not found on the remote server after the backup job completed.\n\nPlease check your server logs for more details or contact support at: https://platform.cyberpersons.com\n\nThank you.',
'sftpUser': ocb.sftpUser,
'serverIP': ACLManager.fetchIP(), # Replace with the actual server IP
'serverIP': ACLManager.fetchIP(),
'status': 'failed' # Critical: tells platform to send email
}
# Convert the payload to JSON format
headers = {'Content-Type': 'application/json'}
dataRet = json.dumps(payload)
# Make the POST request
response = requests.post(url, headers=headers, data=dataRet)
try:
# Make the POST request with timeout
response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30)
# # Handle the response
# # Handle the response
# if response.status_code == 200:
# response_data = response.json()
# if response_data.get('status') == 1:
except:
pass
if response.status_code == 200:
response_data = response.json()
if response_data.get('status') == 1:
logging.CyberCPLogFileWriter.writeToFile(f'Failure notification sent successfully for {check_domain} [IncScheduler.startNormalBackups]')
else:
logging.CyberCPLogFileWriter.writeToFile(f'Failure notification API returned error for {check_domain}: {response_data.get("error_message")} [IncScheduler.startNormalBackups]')
else:
logging.CyberCPLogFileWriter.writeToFile(f'Failure notification API returned HTTP {response.status_code} for {check_domain} [IncScheduler.startNormalBackups]')
except requests.exceptions.RequestException as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to send backup failure notification for {check_domain}: {str(e)} [IncScheduler.startNormalBackups]')
else:
logging.CyberCPLogFileWriter.writeToFile(f'Backup verified successful for {check_domain}, no notification needed [IncScheduler.startNormalBackups]')
except Exception as msg:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking backup status for site: {str(msg)} [IncScheduler.startNormalBackups]')
except:
pass

View File

@ -6706,6 +6706,15 @@ class ApplicationInstaller(multi.Thread):
####
# Ensure /home/cyberpanel directory exists with proper permissions
if not os.path.exists('/home/cyberpanel'):
command = 'mkdir -p /home/cyberpanel'
ProcessUtilities.executioner(command)
# Set proper permissions to allow application to write to the directory
command = 'chmod 755 /home/cyberpanel'
ProcessUtilities.executioner(command)
sftp = ssh.open_sftp()
logging.statusWriter(self.tempStatusPath, 'Downloading Backups...,15')
@ -6724,26 +6733,33 @@ class ApplicationInstaller(multi.Thread):
successRet = stdout.read().decode().strip()
errorRet = stderr.read().decode().strip()
if os.path.exists(ProcessUtilities.debugPath):
logging.writeToFile(f"Command used to retrieve backup {command}")
if errorRet:
# Check if SCP had errors and fallback to SFTP if needed
if errorRet:
if os.path.exists(ProcessUtilities.debugPath):
logging.writeToFile(f"Error in scp command to retrieve backup {errorRet}")
logging.writeToFile(f"Command used to retrieve backup {command}")
statusFile = open(tempStatusPath, 'w')
statusFile.writelines(f"SCP failed, falling back to SFTP...,20")
statusFile.close()
try:
sftp.get(f'cpbackups/{folder}/{backupfile}', f'/home/cyberpanel/{backupfile}',
callback=self.UpdateDownloadStatus)
if os.path.exists(ProcessUtilities.debugPath):
logging.writeToFile(f"Successfully downloaded via SFTP")
except BaseException as msg:
logging.writeToFile(f"Failed to download file {str(msg)} [404]")
statusFile = open(tempStatusPath, 'w')
statusFile.writelines(f"Error in scp command to retrieve backup {errorRet}.")
statusFile.writelines(f"Failed to download file {str(msg)} [404]")
statusFile.close()
try:
sftp.get(f'cpbackups/{folder}/{backupfile}', f'/home/cyberpanel/{backupfile}',
callback=self.UpdateDownloadStatus)
except BaseException as msg:
logging.writeToFile(f"Failed to download file {str(msg)} [404]")
statusFile = open(tempStatusPath, 'w')
statusFile.writelines(f"Failed to download file {str(msg)} [404]")
statusFile.close()
return 0
else:
return 0
else:
if os.path.exists(ProcessUtilities.debugPath):
logging.writeToFile(f"Success in scp command to retrieve backup {successRet}")
logging.writeToFile(f"Command used to retrieve backup {command}")

View File

@ -2457,8 +2457,17 @@ def submitBackupCreation(tempStoragePath, backupName, backupPath, backupDomain):
## This login can be further improved later.
logging.CyberCPLogFileWriter.writeToFile('Failed to create database backup for %s. This could be false positive, moving on.' % (dbName))
command = f'mv /home/cyberpanel/{dbName}.sql {CPHomeStorage}/{dbName}.sql'
ProcessUtilities.executioner(command)
# Move database backup (check for both .sql.gz and .sql)
if os.path.exists(f'/home/cyberpanel/{dbName}.sql.gz'):
command = f'mv /home/cyberpanel/{dbName}.sql.gz {CPHomeStorage}/{dbName}.sql.gz'
ProcessUtilities.executioner(command)
# Also move metadata file if it exists
if os.path.exists(f'/home/cyberpanel/{dbName}.backup.json'):
command = f'mv /home/cyberpanel/{dbName}.backup.json {CPHomeStorage}/{dbName}.backup.json'
ProcessUtilities.executioner(command)
elif os.path.exists(f'/home/cyberpanel/{dbName}.sql'):
command = f'mv /home/cyberpanel/{dbName}.sql {CPHomeStorage}/{dbName}.sql'
ProcessUtilities.executioner(command)
##

View File

@ -0,0 +1,21 @@
{
"database_backup": {
"use_compression": false,
"use_new_features": false,
"parallel_threads": 4,
"single_transaction": true,
"compress_on_fly": false,
"compression_level": 6,
"fallback_to_legacy": true
},
"compatibility": {
"maintain_legacy_format": true,
"dual_format_backup": false,
"auto_detect_restore": true
},
"file_backup": {
"use_parallel_compression": false,
"compression_algorithm": "gzip",
"rsync_compression": false
}
}

View File

@ -631,7 +631,7 @@ class CustomACME:
if response.status_code == 200:
# Wait for order to be processed
max_attempts = 30
max_attempts = 10
delay = 2
for attempt in range(max_attempts):
if not self._get_nonce():
@ -667,7 +667,7 @@ class CustomACME:
f'Order status check failed, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out')
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out after 20 seconds')
return False
return False
except Exception as e:
@ -709,7 +709,7 @@ class CustomACME:
logging.CyberCPLogFileWriter.writeToFile(f'Error downloading certificate: {str(e)}')
return None
def _wait_for_challenge_validation(self, challenge_url, max_attempts=30, delay=2):
def _wait_for_challenge_validation(self, challenge_url, max_attempts=10, delay=2):
"""Wait for challenge to be validated by the ACME server"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Waiting for challenge validation at URL: {challenge_url}')
@ -736,14 +736,36 @@ class CustomACME:
logging.CyberCPLogFileWriter.writeToFile('Challenge validated successfully')
return True
elif challenge_status == 'invalid':
logging.CyberCPLogFileWriter.writeToFile('Challenge validation failed')
# Check for DNS-related errors in the response
response_data = response.json()
error_detail = response_data.get('error', {}).get('detail', '')
# Common DNS-related error patterns
dns_errors = [
'NXDOMAIN',
'DNS problem',
'No valid IP addresses',
'could not be resolved',
'DNS resolution',
'Timeout during connect',
'Connection refused',
'no such host'
]
is_dns_error = any(err.lower() in error_detail.lower() for err in dns_errors)
if is_dns_error:
logging.CyberCPLogFileWriter.writeToFile(
f'Challenge validation failed due to DNS issue: {error_detail}')
else:
logging.CyberCPLogFileWriter.writeToFile(
f'Challenge validation failed: {error_detail}')
return False
logging.CyberCPLogFileWriter.writeToFile(
f'Challenge still pending, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Challenge validation timed out')
logging.CyberCPLogFileWriter.writeToFile('Challenge validation timed out after 20 seconds')
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error waiting for challenge validation: {str(e)}')
@ -768,94 +790,114 @@ class CustomACME:
try:
logging.CyberCPLogFileWriter.writeToFile(f'Checking DNS records for domain: {domain}')
# List of public DNS servers to check against
# List of public DNS servers to check against (reduced to 2 for faster checks)
dns_servers = [
'8.8.8.8', # Google DNS
'1.1.1.1', # Cloudflare DNS
'208.67.222.222' # OpenDNS
'1.1.1.1' # Cloudflare DNS
]
# Function to check DNS record with specific DNS server
def check_with_dns_server(server, record_type='A'):
try:
# Create a new socket for each check
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5) # 5 second timeout
# Set the DNS server
sock.connect((server, 53))
# Create DNS query
query = bytearray()
# DNS header
query += b'\x00\x01' # Transaction ID
query += b'\x01\x00' # Flags: Standard query
query += b'\x00\x01' # Questions: 1
query += b'\x00\x00' # Answer RRs: 0
query += b'\x00\x00' # Authority RRs: 0
query += b'\x00\x00' # Additional RRs: 0
# Domain name
for part in domain.split('.'):
query.append(len(part))
query.extend(part.encode())
query += b'\x00' # End of domain name
# Query type and class
if record_type == 'A':
query += b'\x00\x01' # Type: A
else: # AAAA
query += b'\x00\x1c' # Type: AAAA
query += b'\x00\x01' # Class: IN
# Send query
sock.send(query)
# Receive response
response = sock.recv(1024)
# Check if we got a valid response
if len(response) > 12: # Minimum DNS response size
# Check if there are answers in the response
answer_count = int.from_bytes(response[6:8], 'big')
if answer_count > 0:
return True
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS with server {server}: {str(e)}')
return False
finally:
sock.close()
# Check A records (IPv4) with multiple DNS servers
# Use system's DNS resolver as primary check (faster and respects local config)
a_record_found = False
for server in dns_servers:
if check_with_dns_server(server, 'A'):
a_record_found = True
break
# Check AAAA records (IPv6) with multiple DNS servers
aaaa_record_found = False
for server in dns_servers:
if check_with_dns_server(server, 'AAAA'):
aaaa_record_found = True
break
# Also check with system's DNS resolver as a fallback
try:
# Try to resolve A record (IPv4)
# Try to resolve A record (IPv4) with timeout
old_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(3) # 3 second timeout
socket.gethostbyname(domain)
a_record_found = True
socket.setdefaulttimeout(old_timeout)
except socket.gaierror:
socket.setdefaulttimeout(old_timeout)
pass
except socket.timeout:
socket.setdefaulttimeout(old_timeout)
pass
try:
# Try to resolve AAAA record (IPv6)
# Try to resolve AAAA record (IPv6) with timeout
old_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(3) # 3 second timeout
socket.getaddrinfo(domain, None, socket.AF_INET6)
aaaa_record_found = True
socket.setdefaulttimeout(old_timeout)
except socket.gaierror:
socket.setdefaulttimeout(old_timeout)
pass
except socket.timeout:
socket.setdefaulttimeout(old_timeout)
pass
# If system resolver fails, try public DNS servers as fallback
if not a_record_found and not aaaa_record_found:
# Function to check DNS record with specific DNS server
def check_with_dns_server(server, record_type='A'):
try:
# Create a new socket for each check
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2) # 2 second timeout
# Set the DNS server
sock.connect((server, 53))
# Create DNS query
query = bytearray()
# DNS header
query += b'\x00\x01' # Transaction ID
query += b'\x01\x00' # Flags: Standard query
query += b'\x00\x01' # Questions: 1
query += b'\x00\x00' # Answer RRs: 0
query += b'\x00\x00' # Authority RRs: 0
query += b'\x00\x00' # Additional RRs: 0
# Domain name
for part in domain.split('.'):
query.append(len(part))
query.extend(part.encode())
query += b'\x00' # End of domain name
# Query type and class
if record_type == 'A':
query += b'\x00\x01' # Type: A
else: # AAAA
query += b'\x00\x1c' # Type: AAAA
query += b'\x00\x01' # Class: IN
# Send query
sock.send(query)
# Receive response
response = sock.recv(1024)
# Check if we got a valid response
if len(response) > 12: # Minimum DNS response size
# Check if there are answers in the response
answer_count = int.from_bytes(response[6:8], 'big')
if answer_count > 0:
return True
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS with server {server}: {str(e)}')
return False
finally:
try:
sock.close()
except:
pass
# Check A records (IPv4) with first available DNS server only
for server in dns_servers:
if check_with_dns_server(server, 'A'):
a_record_found = True
break
# Only check AAAA if A record wasn't found and we still have time
if not a_record_found:
for server in dns_servers:
if check_with_dns_server(server, 'AAAA'):
aaaa_record_found = True
break
# Log the results
if a_record_found:
@ -870,7 +912,7 @@ class CustomACME:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS records: {str(e)}')
return False
def _wait_for_order_processing(self, max_attempts=30, delay=2):
def _wait_for_order_processing(self, max_attempts=10, delay=2):
"""Wait for order to be processed"""
try:
logging.CyberCPLogFileWriter.writeToFile('Waiting for order processing...')
@ -910,7 +952,7 @@ class CustomACME:
f'Order status check failed, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out')
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out after 20 seconds')
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error waiting for order processing: {str(e)}')

View File

@ -249,8 +249,24 @@ class mysqlUtilities:
return str(msg)
@staticmethod
def createDatabaseBackup(databaseName, tempStoragePath, rustic=0, RusticRepoName = None, externalApp = None):
def createDatabaseBackup(databaseName, tempStoragePath, rustic=0, RusticRepoName = None,
externalApp = None, use_compression=None, use_new_features=None):
"""
Enhanced database backup with backward compatibility
Parameters:
- use_compression: None (auto-detect), True (force compression), False (no compression)
- use_new_features: None (auto-detect based on config), True/False (force)
"""
try:
# Check if new features are enabled (via config file or parameter)
if use_new_features is None:
use_new_features = mysqlUtilities.checkNewBackupFeatures()
# Determine compression based on config or parameter
if use_compression is None:
use_compression = mysqlUtilities.shouldUseCompression()
passFile = "/etc/cyberpanel/mysqlPassword"
try:
@ -291,53 +307,58 @@ password=%s
SHELL = False
if rustic == 0:
# Determine backup file extension based on compression
if use_compression:
backup_extension = '.sql.gz'
backup_file = f"{tempStoragePath}/{databaseName}{backup_extension}"
else:
backup_extension = '.sql'
backup_file = f"{tempStoragePath}/{databaseName}{backup_extension}"
command = 'rm -f ' + tempStoragePath + "/" + databaseName + '.sql'
# Remove old backup if exists
command = f'rm -f {backup_file}'
ProcessUtilities.executioner(command)
command = 'mysqldump --defaults-file=/home/cyberpanel/.my.cnf -u %s --host=%s --port %s %s' % (mysqluser, mysqlhost, mysqlport, databaseName)
# Build mysqldump command with new features
dump_cmd = mysqlUtilities.buildMysqldumpCommand(
mysqluser, mysqlhost, mysqlport, databaseName,
use_new_features, use_compression
)
# if os.path.exists(ProcessUtilities.debugPath):
# logging.CyberCPLogFileWriter.writeToFile(command)
#
# logging.CyberCPLogFileWriter.writeToFile(f'Get current executing uid {os.getuid()}')
#
# cmd = shlex.split(command)
#
# try:
# errorPath = '/home/cyberpanel/error-logs.txt'
# errorLog = open(errorPath, 'a')
# with open(tempStoragePath + "/" + databaseName + '.sql', 'w') as f:
# res = subprocess.call(cmd, stdout=f, stderr=errorLog, shell=SHELL)
# if res != 0:
# logging.CyberCPLogFileWriter.writeToFile(
# "Database: " + databaseName + "could not be backed! [createDatabaseBackup]")
# return 0
# except subprocess.CalledProcessError as msg:
# logging.CyberCPLogFileWriter.writeToFile(
# "Database: " + databaseName + "could not be backed! Error: %s. [createDatabaseBackup]" % (
# str(msg)))
# return 0
if use_compression:
# New method: Stream directly to compressed file
full_command = f"{dump_cmd} | gzip -c > {backup_file}"
result = ProcessUtilities.executioner(full_command, shell=True)
cmd = shlex.split(command)
with open(tempStoragePath + "/" + databaseName + '.sql', 'w') as f:
# Using subprocess.run to capture stdout and stderr
result = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.PIPE,
shell=SHELL
)
# Check if the command was successful
if result.returncode != 0:
# Verify backup file was created successfully
if not os.path.exists(backup_file) or os.path.getsize(backup_file) == 0:
logging.CyberCPLogFileWriter.writeToFile(
"Database: " + databaseName + " could not be backed up! [createDatabaseBackup]"
f"Database: {databaseName} could not be backed up (compressed)! [createDatabaseBackup]"
)
# Log stderr
logging.CyberCPLogFileWriter.writeToFile(result.stderr.decode('utf-8'))
return 0
else:
# Legacy method: Direct dump to file (backward compatible)
cmd = shlex.split(dump_cmd)
with open(backup_file, 'w') as f:
result = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.PIPE,
shell=SHELL
)
if result.returncode != 0:
logging.CyberCPLogFileWriter.writeToFile(
"Database: " + databaseName + " could not be backed up! [createDatabaseBackup]"
)
logging.CyberCPLogFileWriter.writeToFile(result.stderr.decode('utf-8'))
return 0
# Store metadata about backup format for restore
mysqlUtilities.saveBackupMetadata(
databaseName, tempStoragePath, use_compression, use_new_features
)
else:
SHELL = True
@ -369,6 +390,9 @@ password=%s
@staticmethod
def restoreDatabaseBackup(databaseName, tempStoragePath, dbPassword, passwordCheck = None, additionalName = None, rustic=0, RusticRepoName = None, externalApp = None, snapshotid = None):
"""
Enhanced restore with automatic format detection
"""
try:
passFile = "/etc/cyberpanel/mysqlPassword"
@ -409,25 +433,55 @@ password=%s
subprocess.call(shlex.split(command))
if rustic == 0:
# Auto-detect backup format
backup_format = mysqlUtilities.detectBackupFormat(
tempStoragePath, databaseName, additionalName
)
command = 'mysql --defaults-file=/home/cyberpanel/.my.cnf -u %s --host=%s --port %s %s' % (mysqluser, mysqlhost, mysqlport, databaseName)
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(f'{command} {tempStoragePath}/{databaseName} ' )
cmd = shlex.split(command)
if additionalName == None:
with open(tempStoragePath + "/" + databaseName + '.sql', 'r') as f:
res = subprocess.call(cmd, stdin=f)
if res != 0:
logging.CyberCPLogFileWriter.writeToFile("Could not restore MYSQL database: " + databaseName +"! [restoreDatabaseBackup]")
return 0
if additionalName:
base_name = additionalName
else:
with open(tempStoragePath + "/" + additionalName + '.sql', 'r') as f:
res = subprocess.call(cmd, stdin=f)
base_name = databaseName
if res != 0:
logging.CyberCPLogFileWriter.writeToFile("Could not restore MYSQL database: " + additionalName + "! [restoreDatabaseBackup]")
return 0
# Determine actual backup file based on detected format
if backup_format['compressed']:
backup_file = f"{tempStoragePath}/{base_name}.sql.gz"
if not os.path.exists(backup_file):
# Fallback to uncompressed for backward compatibility
backup_file = f"{tempStoragePath}/{base_name}.sql"
backup_format['compressed'] = False
else:
backup_file = f"{tempStoragePath}/{base_name}.sql"
if not os.path.exists(backup_file):
# Try compressed version
backup_file = f"{tempStoragePath}/{base_name}.sql.gz"
if os.path.exists(backup_file):
backup_format['compressed'] = True
if not os.path.exists(backup_file):
logging.CyberCPLogFileWriter.writeToFile(
f"Backup file not found: {backup_file}"
)
return 0
# Build restore command
mysql_cmd = f'mysql --defaults-file=/home/cyberpanel/.my.cnf -u {mysqluser} --host={mysqlhost} --port {mysqlport} {databaseName}'
if backup_format['compressed']:
# Handle compressed backup
restore_cmd = f"gunzip -c {backup_file} | {mysql_cmd}"
result = ProcessUtilities.executioner(restore_cmd, shell=True)
# Don't rely solely on exit code, MySQL import usually succeeds
# The passwordCheck logic below will verify database integrity
else:
# Handle uncompressed backup (legacy)
cmd = shlex.split(mysql_cmd)
with open(backup_file, 'r') as f:
result = subprocess.call(cmd, stdin=f)
# Don't fail on non-zero exit as MySQL may return warnings
# The passwordCheck logic below will verify database integrity
if passwordCheck == None:
@ -449,6 +503,8 @@ password=%s
logging.CyberCPLogFileWriter.writeToFile(f'{command} {tempStoragePath}/{databaseName} ')
ProcessUtilities.outputExecutioner(command, None, True)
return 1
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + "[restoreDatabaseBackup]")
return 0
@ -1220,6 +1276,153 @@ gpgcheck=1
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, 'Completed [200]')
@staticmethod
def buildMysqldumpCommand(user, host, port, database, use_new_features, use_compression):
"""Build mysqldump command with appropriate options"""
base_cmd = f"mysqldump --defaults-file=/home/cyberpanel/.my.cnf -u {user} --host={host} --port {port}"
# Add new performance features if enabled
if use_new_features:
# Add single-transaction for InnoDB consistency
base_cmd += " --single-transaction"
# Add extended insert for better performance
base_cmd += " --extended-insert"
# Add order by primary for consistent dumps
base_cmd += " --order-by-primary"
# Add quick option to avoid loading entire result set
base_cmd += " --quick"
# Add lock tables option
base_cmd += " --lock-tables=false"
# Check MySQL version for parallel support
if mysqlUtilities.supportParallelDump():
# Get number of threads (max 4 for safety)
threads = min(4, ProcessUtilities.getNumberOfCores() if hasattr(ProcessUtilities, 'getNumberOfCores') else 2)
base_cmd += f" --parallel={threads}"
base_cmd += f" {database}"
return base_cmd
@staticmethod
def saveBackupMetadata(database, path, compressed, new_features):
"""Save metadata about backup format for restore compatibility"""
import time
metadata = {
'database': database,
'compressed': compressed,
'new_features': new_features,
'backup_version': '2.0' if new_features else '1.0',
'timestamp': time.time()
}
metadata_file = f"{path}/{database}.backup.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f)
@staticmethod
def detectBackupFormat(path, database, additional_name=None):
"""
Detect backup format from metadata or file extension
"""
base_name = additional_name if additional_name else database
# First try to read metadata file (new backups will have this)
metadata_file = f"{path}/{base_name}.backup.json"
if os.path.exists(metadata_file):
try:
with open(metadata_file, 'r') as f:
return json.load(f)
except:
pass
# Fallback: detect by file existence and extension
format_info = {
'compressed': False,
'new_features': False,
'backup_version': '1.0'
}
# Check for compressed file
if os.path.exists(f"{path}/{base_name}.sql.gz"):
format_info['compressed'] = True
# Compressed backups likely use new features
format_info['new_features'] = True
format_info['backup_version'] = '2.0'
elif os.path.exists(f"{path}/{base_name}.sql"):
format_info['compressed'] = False
# Check file content for new features indicators
format_info['new_features'] = mysqlUtilities.checkSQLFileFeatures(
f"{path}/{base_name}.sql"
)
return format_info
@staticmethod
def checkNewBackupFeatures():
"""Check if new backup features are enabled"""
try:
config_file = '/usr/local/CyberCP/plogical/backup_config.json'
if not os.path.exists(config_file):
# Try alternate location
config_file = '/etc/cyberpanel/backup_config.json'
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
return config.get('database_backup', {}).get('use_new_features', False)
except:
pass
return False # Default to legacy mode for safety
@staticmethod
def shouldUseCompression():
"""Check if compression should be used"""
try:
config_file = '/usr/local/CyberCP/plogical/backup_config.json'
if not os.path.exists(config_file):
# Try alternate location
config_file = '/etc/cyberpanel/backup_config.json'
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
return config.get('database_backup', {}).get('use_compression', False)
except:
pass
return False # Default to no compression for compatibility
@staticmethod
def supportParallelDump():
"""Check if MySQL version supports parallel dump"""
try:
result = ProcessUtilities.outputExecutioner("mysql --version")
# MySQL 8.0+ and MariaDB 10.3+ support parallel dump
if "8.0" in result or "8.1" in result or "10.3" in result or "10.4" in result or "10.5" in result or "10.6" in result:
return True
except:
pass
return False
@staticmethod
def checkSQLFileFeatures(file_path):
"""Check SQL file for new feature indicators"""
try:
# Read first few lines to check for new features
with open(file_path, 'r') as f:
head = f.read(2048) # Read first 2KB
# Check for indicators of new features
if "--single-transaction" in head or "--extended-insert" in head or "-- Dump completed" in head:
return True
except:
pass
return False
def main():
parser = argparse.ArgumentParser(description='CyberPanel')

View File

@ -553,6 +553,21 @@ class ProcessUtilities(multi.Thread):
print("An error occurred:", e)
return None
@staticmethod
def getNumberOfCores():
"""Get the number of CPU cores available on the system"""
try:
import multiprocessing
return multiprocessing.cpu_count()
except:
try:
# Fallback method using /proc/cpuinfo
with open('/proc/cpuinfo', 'r') as f:
return len([line for line in f if line.startswith('processor')])
except:
# Default to 2 if we can't determine
return 2
@staticmethod
def fetch_latest_prestashop_version():
import requests

View File

@ -820,10 +820,9 @@ context /.well-known/acme-challenge {
logging.CyberCPLogFileWriter.writeToFile(
f"www.{virtualHostName} has no DNS records, excluding from acme.sh SSL request")
# Step 1: Issue the certificate (staging) - this stores config in /root/.acme.sh/
command = acmePath + " --issue" + domain_list \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging' \
+ ' --webroot-path /usr/local/lsws/Example/html'
+ ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging'
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
@ -833,10 +832,9 @@ context /.well-known/acme-challenge {
universal_newlines=True, shell=True)
if result.returncode == 0:
# Step 2: Issue the certificate (production) - this stores config in /root/.acme.sh/
command = acmePath + " --issue" + domain_list \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt' \
+ ' --webroot-path /usr/local/lsws/Example/html'
+ ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
@ -846,11 +844,25 @@ context /.well-known/acme-challenge {
universal_newlines=True, shell=True)
if result.returncode == 0:
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, result.stdout,
'SSL Notification for %s.' % (virtualHostName))
return 1
# Step 3: Install the certificate to the desired location
install_command = acmePath + " --install-cert -d " + virtualHostName \
+ ' --cert-file ' + existingCertPath + '/cert.pem' \
+ ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem'
try:
install_result = subprocess.run(install_command, capture_output=True, universal_newlines=True, shell=True)
except TypeError:
# Fallback for Python < 3.7
install_result = subprocess.run(install_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if install_result.returncode == 0:
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, result.stdout,
'SSL Notification for %s.' % (virtualHostName))
return 1
return 0
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(str(e))
@ -876,9 +888,9 @@ context /.well-known/acme-challenge {
if sslUtilities.checkDNSRecords(f'www.{aliasDomain}'):
domain_list += " -d www." + aliasDomain
# Step 1: Issue the certificate - this stores config in /root/.acme.sh/
command = acmePath + " --issue" + domain_list \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
+ ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
@ -888,7 +900,21 @@ context /.well-known/acme-challenge {
universal_newlines=True, shell=True)
if result.returncode == 0:
return 1
# Step 2: Install the certificate to the desired location
install_command = acmePath + " --install-cert -d " + virtualHostName \
+ ' --cert-file ' + existingCertPath + '/cert.pem' \
+ ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem'
try:
install_result = subprocess.run(install_command, capture_output=True, universal_newlines=True, shell=True)
except TypeError:
# Fallback for Python < 3.7
install_result = subprocess.run(install_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if install_result.returncode == 0:
return 1
return 0
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(str(e))

View File

@ -0,0 +1,300 @@
#!/usr/local/CyberCP/bin/python
"""
Test script to verify backward compatibility of database backup improvements
Tests both legacy and new backup/restore paths
"""
import os
import sys
import json
import tempfile
import shutil
sys.path.append('/usr/local/CyberCP')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
from plogical.mysqlUtilities import mysqlUtilities
from plogical.processUtilities import ProcessUtilities
class BackupCompatibilityTests:
"""Test suite for backup compatibility"""
@staticmethod
def setup_test_environment():
"""Create a test directory for backups"""
test_dir = tempfile.mkdtemp(prefix="cyberpanel_backup_test_")
print(f"Created test directory: {test_dir}")
return test_dir
@staticmethod
def cleanup_test_environment(test_dir):
"""Clean up test directory"""
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
print(f"Cleaned up test directory: {test_dir}")
@staticmethod
def test_config_file():
"""Test configuration file reading"""
print("\n=== Testing Configuration File ===")
config_file = '/usr/local/CyberCP/plogical/backup_config.json'
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
print(f"Configuration loaded successfully")
print(f"Use compression: {config['database_backup']['use_compression']}")
print(f"Use new features: {config['database_backup']['use_new_features']}")
print(f"Auto-detect restore: {config['compatibility']['auto_detect_restore']}")
return True
else:
print(f"Configuration file not found at {config_file}")
return False
@staticmethod
def test_helper_functions():
"""Test helper functions"""
print("\n=== Testing Helper Functions ===")
# Test checkNewBackupFeatures
new_features = mysqlUtilities.checkNewBackupFeatures()
print(f"New backup features enabled: {new_features}")
# Test shouldUseCompression
use_compression = mysqlUtilities.shouldUseCompression()
print(f"Compression enabled: {use_compression}")
# Test supportParallelDump
parallel_support = mysqlUtilities.supportParallelDump()
print(f"Parallel dump supported: {parallel_support}")
# Test getNumberOfCores
cores = ProcessUtilities.getNumberOfCores()
print(f"Number of CPU cores: {cores}")
return True
@staticmethod
def test_legacy_backup(test_db="test_legacy_db", test_dir="/tmp"):
"""Test that legacy backups still work"""
print("\n=== Testing Legacy Backup (No Compression, No New Features) ===")
try:
# Create backup with old method
print(f"Creating legacy backup for {test_db}...")
result = mysqlUtilities.createDatabaseBackup(
test_db, test_dir, use_compression=False, use_new_features=False
)
if result == 1:
print(f"✓ Legacy backup created successfully")
# Check that .sql file exists (not .sql.gz)
legacy_file = f"{test_dir}/{test_db}.sql"
if os.path.exists(legacy_file):
file_size = os.path.getsize(legacy_file)
print(f"✓ Legacy backup file exists: {legacy_file}")
print(f" File size: {file_size} bytes")
# Check metadata file
metadata_file = f"{test_dir}/{test_db}.backup.json"
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
print(f"✓ Metadata file exists")
print(f" Backup version: {metadata['backup_version']}")
print(f" Compressed: {metadata['compressed']}")
print(f" New features: {metadata['new_features']}")
return True
else:
print(f"✗ Legacy backup file not found: {legacy_file}")
return False
else:
print(f"✗ Legacy backup failed")
return False
except Exception as e:
print(f"✗ Error during legacy backup test: {str(e)}")
return False
@staticmethod
def test_new_backup(test_db="test_new_db", test_dir="/tmp"):
"""Test new compressed backups"""
print("\n=== Testing New Backup (With Compression and New Features) ===")
try:
# Create backup with new method
print(f"Creating compressed backup for {test_db}...")
result = mysqlUtilities.createDatabaseBackup(
test_db, test_dir, use_compression=True, use_new_features=True
)
if result == 1:
print(f"✓ New backup created successfully")
# Check that .sql.gz file exists
compressed_file = f"{test_dir}/{test_db}.sql.gz"
if os.path.exists(compressed_file):
file_size = os.path.getsize(compressed_file)
print(f"✓ Compressed backup file exists: {compressed_file}")
print(f" File size: {file_size} bytes")
# Check metadata file
metadata_file = f"{test_dir}/{test_db}.backup.json"
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
print(f"✓ Metadata file exists")
print(f" Backup version: {metadata['backup_version']}")
print(f" Compressed: {metadata['compressed']}")
print(f" New features: {metadata['new_features']}")
return True
else:
print(f"✗ Compressed backup file not found: {compressed_file}")
# Check if legacy file was created instead
legacy_file = f"{test_dir}/{test_db}.sql"
if os.path.exists(legacy_file):
print(f" Note: Legacy file exists instead: {legacy_file}")
return False
else:
print(f"✗ New backup failed")
return False
except Exception as e:
print(f"✗ Error during new backup test: {str(e)}")
return False
@staticmethod
def test_format_detection(test_dir="/tmp"):
"""Test backup format auto-detection"""
print("\n=== Testing Format Detection ===")
# Test detection of compressed backup
test_db = "test_detect"
# Create a dummy compressed backup
compressed_file = f"{test_dir}/{test_db}.sql.gz"
with open(compressed_file, 'wb') as f:
f.write(b'\x1f\x8b\x08\x00\x00\x00\x00\x00') # gzip header
# Create metadata
metadata = {
'database': test_db,
'compressed': True,
'new_features': True,
'backup_version': '2.0'
}
metadata_file = f"{test_dir}/{test_db}.backup.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f)
# Test detection
detected_format = mysqlUtilities.detectBackupFormat(test_dir, test_db)
print(f"Detected format for compressed backup:")
print(f" Compressed: {detected_format['compressed']}")
print(f" New features: {detected_format['new_features']}")
print(f" Version: {detected_format['backup_version']}")
# Clean up test files
os.remove(compressed_file)
os.remove(metadata_file)
# Create a dummy uncompressed backup
uncompressed_file = f"{test_dir}/{test_db}.sql"
with open(uncompressed_file, 'w') as f:
f.write("-- MySQL dump\n")
# Test detection without metadata
detected_format = mysqlUtilities.detectBackupFormat(test_dir, test_db)
print(f"\nDetected format for uncompressed backup (no metadata):")
print(f" Compressed: {detected_format['compressed']}")
print(f" New features: {detected_format['new_features']}")
print(f" Version: {detected_format['backup_version']}")
# Clean up
os.remove(uncompressed_file)
return True
@staticmethod
def test_mysqldump_command():
"""Test mysqldump command building"""
print("\n=== Testing MySQL Dump Command Building ===")
# Test legacy command
legacy_cmd = mysqlUtilities.buildMysqldumpCommand(
"root", "localhost", "3306", "test_db",
use_new_features=False, use_compression=False
)
print(f"Legacy command: {legacy_cmd}")
# Test new command with features
new_cmd = mysqlUtilities.buildMysqldumpCommand(
"root", "localhost", "3306", "test_db",
use_new_features=True, use_compression=True
)
print(f"New command: {new_cmd}")
return True
@staticmethod
def run_all_tests():
"""Run all compatibility tests"""
print("=" * 60)
print("CyberPanel Database Backup Compatibility Test Suite")
print("=" * 60)
all_passed = True
# Test configuration
if not BackupCompatibilityTests.test_config_file():
all_passed = False
# Test helper functions
if not BackupCompatibilityTests.test_helper_functions():
all_passed = False
# Test mysqldump command building
if not BackupCompatibilityTests.test_mysqldump_command():
all_passed = False
# Setup test environment
test_dir = BackupCompatibilityTests.setup_test_environment()
try:
# Test format detection
if not BackupCompatibilityTests.test_format_detection(test_dir):
all_passed = False
# Note: Actual backup/restore tests would require a real database
# These are commented out but show the structure
# # Test legacy backup
# if not BackupCompatibilityTests.test_legacy_backup("test_db", test_dir):
# all_passed = False
# # Test new backup
# if not BackupCompatibilityTests.test_new_backup("test_db", test_dir):
# all_passed = False
finally:
# Cleanup
BackupCompatibilityTests.cleanup_test_environment(test_dir)
print("\n" + "=" * 60)
if all_passed:
print("✓ All tests passed successfully!")
print("The backup system is fully backward compatible.")
else:
print("✗ Some tests failed. Please check the output above.")
print("=" * 60)
return all_passed
if __name__ == "__main__":
# Run the test suite
success = BackupCompatibilityTests.run_all_tests()
sys.exit(0 if success else 1)

View File

@ -1249,13 +1249,50 @@ $cfg['Servers'][$i]['LogoutURL'] = 'phpmyadminsignin.php?logout';
`completed_at` datetime(6) DEFAULT NULL,
KEY `ai_scanner_scheduled_executions_scheduled_scan_id_idx` (`scheduled_scan_id`),
KEY `ai_scanner_scheduled_executions_execution_time_idx` (`execution_time` DESC),
CONSTRAINT `ai_scanner_scheduled_executions_scheduled_scan_id_fk` FOREIGN KEY (`scheduled_scan_id`)
CONSTRAINT `ai_scanner_scheduled_executions_scheduled_scan_id_fk` FOREIGN KEY (`scheduled_scan_id`)
REFERENCES `ai_scanner_scheduled_scans` (`id`) ON DELETE CASCADE
)
''')
except:
pass
# AI Scanner File Operation Audit Tables
try:
cursor.execute('''
CREATE TABLE `scanner_file_operations` (
`id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
`scan_id` varchar(255) NOT NULL,
`operation` varchar(20) NOT NULL,
`file_path` varchar(500) NOT NULL,
`backup_path` varchar(500) DEFAULT NULL,
`success` bool NOT NULL DEFAULT 0,
`error_message` longtext DEFAULT NULL,
`ip_address` varchar(45) DEFAULT NULL,
`user_agent` varchar(255) DEFAULT NULL,
`created_at` datetime(6) NOT NULL,
KEY `scanner_file_operations_scan_id_idx` (`scan_id`),
KEY `scanner_file_operations_created_at_idx` (`created_at`),
KEY `scanner_file_operations_scan_created_idx` (`scan_id`, `created_at`)
)
''')
except:
pass
try:
cursor.execute('''
CREATE TABLE `scanner_api_rate_limits` (
`id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
`scan_id` varchar(255) NOT NULL,
`endpoint` varchar(100) NOT NULL,
`request_count` integer NOT NULL DEFAULT 0,
`last_request_at` datetime(6) NOT NULL,
UNIQUE KEY `scanner_api_rate_limits_scan_endpoint_unique` (`scan_id`, `endpoint`),
KEY `scanner_api_rate_limits_scan_endpoint_idx` (`scan_id`, `endpoint`)
)
''')
except:
pass
try:
cursor.execute(
'CREATE TABLE `loginSystem_acl` (`id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY, `name` varchar(50) NOT NULL UNIQUE, `adminStatus` integer NOT NULL DEFAULT 0, `versionManagement` integer NOT NULL DEFAULT 0, `createNewUser` integer NOT NULL DEFAULT 0, `deleteUser` integer NOT NULL DEFAULT 0, `resellerCenter` integer NOT NULL DEFAULT 0, `changeUserACL` integer NOT NULL DEFAULT 0, `createWebsite` integer NOT NULL DEFAULT 0, `modifyWebsite` integer NOT NULL DEFAULT 0, `suspendWebsite` integer NOT NULL DEFAULT 0, `deleteWebsite` integer NOT NULL DEFAULT 0, `createPackage` integer NOT NULL DEFAULT 0, `deletePackage` integer NOT NULL DEFAULT 0, `modifyPackage` integer NOT NULL DEFAULT 0, `createDatabase` integer NOT NULL DEFAULT 0, `deleteDatabase` integer NOT NULL DEFAULT 0, `listDatabases` integer NOT NULL DEFAULT 0, `createNameServer` integer NOT NULL DEFAULT 0, `createDNSZone` integer NOT NULL DEFAULT 0, `deleteZone` integer NOT NULL DEFAULT 0, `addDeleteRecords` integer NOT NULL DEFAULT 0, `createEmail` integer NOT NULL DEFAULT 0, `deleteEmail` integer NOT NULL DEFAULT 0, `emailForwarding` integer NOT NULL DEFAULT 0, `changeEmailPassword` integer NOT NULL DEFAULT 0, `dkimManager` integer NOT NULL DEFAULT 0, `createFTPAccount` integer NOT NULL DEFAULT 0, `deleteFTPAccount` integer NOT NULL DEFAULT 0, `listFTPAccounts` integer NOT NULL DEFAULT 0, `createBackup` integer NOT NULL DEFAULT 0, `restoreBackup` integer NOT NULL DEFAULT 0, `addDeleteDestinations` integer NOT NULL DEFAULT 0, `scheduleBackups` integer NOT NULL DEFAULT 0, `remoteBackups` integer NOT NULL DEFAULT 0, `manageSSL` integer NOT NULL DEFAULT 0, `hostnameSSL` integer NOT NULL DEFAULT 0, `mailServerSSL` integer NOT NULL DEFAULT 0)')

View File

@ -25,7 +25,7 @@ from managePHP.phpManager import PHPManager
from plogical.vhostConfs import vhostConfs
from ApachController.ApacheVhosts import ApacheVhost
try:
from websiteFunctions.models import Websites, ChildDomains, aliasDomains, DockerSites
from websiteFunctions.models import Websites, ChildDomains, aliasDomains, DockerSites, WPSites, WPStaging
from databases.models import Databases
except:
pass
@ -404,6 +404,21 @@ class vhost:
if ACLManager.FindIfChild() == 0:
### Delete WordPress Sites and Staging Sites first
try:
wpSites = WPSites.objects.filter(owner=delWebsite)
for wpSite in wpSites:
# Delete any staging sites associated with this WP site
stagingSites = WPStaging.objects.filter(wpsite=wpSite)
for staging in stagingSites:
staging.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Deleted staging site record: {staging.id}")
# Delete the WP site itself
wpSite.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Deleted WP site: {wpSite.id}")
except Exception as msg:
logging.CyberCPLogFileWriter.writeToFile(f"Error cleaning up WP/Staging sites: {str(msg)}")
### Delete Docker Sites first before website deletion
if os.path.exists('/home/docker/%s' % (virtualHostName)):
@ -497,6 +512,21 @@ class vhost:
## child check to make sure no database entires are being deleted from child server
if ACLManager.FindIfChild() == 0:
### Delete WordPress Sites and Staging Sites first
try:
wpSites = WPSites.objects.filter(owner=delWebsite)
for wpSite in wpSites:
# Delete any staging sites associated with this WP site
stagingSites = WPStaging.objects.filter(wpsite=wpSite)
for staging in stagingSites:
staging.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Deleted staging site record: {staging.id}")
# Delete the WP site itself
wpSite.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Deleted WP site: {wpSite.id}")
except Exception as msg:
logging.CyberCPLogFileWriter.writeToFile(f"Error cleaning up WP/Staging sites: {str(msg)}")
for items in databases:
mysqlUtilities.deleteDatabase(items.dbName, items.dbUser)

View File

@ -32,7 +32,7 @@ from ApachController.ApacheVhosts import ApacheVhost
from managePHP.phpManager import PHPManager
try:
from websiteFunctions.models import Websites, ChildDomains, aliasDomains
from websiteFunctions.models import Websites, ChildDomains, aliasDomains, WPSites, WPStaging
from loginSystem.models import Administrator
from packages.models import Package
from CLManager.models import CLPackages
@ -598,6 +598,41 @@ local_name %s {
'This website already exists as child domain. [404]')
return 0, "This website already exists as child domain."
# Check for orphaned staging site domain conflicts
try:
# Check if there are any WP sites with FinalURL matching this domain
conflicting_wp_sites = WPSites.objects.filter(FinalURL__icontains=virtualHostName)
for wp_site in conflicting_wp_sites:
# Check if the WP site's owner website still exists
try:
owner_website = wp_site.owner
if not Websites.objects.filter(id=owner_website.id).exists():
# Orphaned WP site found, clean it up
wp_site.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Cleaned up orphaned WP site: {wp_site.id} with URL: {wp_site.FinalURL}")
except:
# WP site owner is missing, delete it
wp_site.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Cleaned up orphaned WP site: {wp_site.id} (missing owner)")
# Check for orphaned staging sites
orphaned_staging = WPStaging.objects.filter(wpsite__FinalURL__icontains=virtualHostName)
for staging in orphaned_staging:
try:
# Check if the staging site's wpsite still exists and has valid owner
wpsite = staging.wpsite
owner_website = wpsite.owner
if not Websites.objects.filter(id=owner_website.id).exists():
# Owner website doesn't exist, clean up staging
staging.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Cleaned up orphaned staging site: {staging.id}")
except:
# Staging site has invalid references, delete it
staging.delete()
logging.CyberCPLogFileWriter.writeToFile(f"Cleaned up orphaned staging site: {staging.id} (invalid references)")
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Error during staging site cleanup: {str(e)}")
####### Limitations Check End
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, 'Creating DNS records..,10')

View File

@ -36,4 +36,5 @@ uvicorn
asyncssh
python-jose
websockets
PyJWT
PyJWT
python-dotenv

130
test_api_auth.py Normal file
View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""
Test script to verify both Bearer token and X-API-Key authentication work
for CyberPanel AI Scanner file operations.
"""
import requests
import json
import sys
# Test configuration
BASE_URL = "http://localhost:8001" # Adjust if needed
SCAN_ID = "test-scan-123"
FILE_PATH = "wp-content/plugins/test.php"
def test_bearer_auth(token):
"""Test with Bearer token authentication"""
print("Testing Bearer token authentication...")
headers = {
"Authorization": f"Bearer {token}",
"X-Scan-ID": SCAN_ID,
"Content-Type": "application/json"
}
# Test get-file endpoint
url = f"{BASE_URL}/api/scanner/get-file"
params = {"file_path": FILE_PATH}
response = requests.get(url, params=params, headers=headers)
print(f"Bearer auth response: {response.status_code}")
if response.status_code != 200:
print(f"Response: {response.text}")
return response.status_code == 200 or response.status_code == 404 # 404 is ok if file doesn't exist
def test_api_key_auth(api_key):
"""Test with X-API-Key authentication"""
print("\nTesting X-API-Key authentication...")
headers = {
"X-API-Key": api_key,
"X-Scan-ID": SCAN_ID,
"Content-Type": "application/json"
}
# Test get-file endpoint
url = f"{BASE_URL}/api/scanner/get-file"
params = {"file_path": FILE_PATH}
response = requests.get(url, params=params, headers=headers)
print(f"X-API-Key auth response: {response.status_code}")
if response.status_code != 200:
print(f"Response: {response.text}")
return response.status_code == 200 or response.status_code == 404 # 404 is ok if file doesn't exist
def test_mixed_endpoints():
"""Test different endpoints with both authentication methods"""
print("\n" + "="*50)
print("Testing multiple endpoints with both auth methods")
print("="*50)
# You would need real tokens for this to work
test_token = "cp_test_token_12345"
test_api_key = "cp_test_api_key_67890"
endpoints = [
("GET", "/api/ai-scanner/files/list", {"path": "wp-content"}),
("GET", "/api/ai-scanner/files/content", {"path": FILE_PATH}),
("GET", "/api/scanner/get-file", {"file_path": FILE_PATH}),
]
for method, endpoint, params in endpoints:
print(f"\nTesting {method} {endpoint}")
# Test with Bearer token
headers_bearer = {
"Authorization": f"Bearer {test_token}",
"X-Scan-ID": SCAN_ID
}
# Test with X-API-Key
headers_api_key = {
"X-API-Key": test_api_key,
"X-Scan-ID": SCAN_ID
}
url = f"{BASE_URL}{endpoint}"
# Make requests (will fail without valid tokens, but shows the headers work)
if method == "GET":
response_bearer = requests.get(url, params=params, headers=headers_bearer)
response_api_key = requests.get(url, params=params, headers=headers_api_key)
print(f" Bearer auth: {response_bearer.status_code}")
print(f" X-API-Key auth: {response_api_key.status_code}")
def main():
"""Main test function"""
print("CyberPanel AI Scanner Authentication Test")
print("="*50)
if len(sys.argv) > 1:
# If token provided as argument, use it
token = sys.argv[1]
# Test both authentication methods with the same token
# (assumes token is valid for both methods)
bearer_success = test_bearer_auth(token)
api_key_success = test_api_key_auth(token)
print("\n" + "="*50)
print("Test Results:")
print(f" Bearer authentication: {'✓ PASS' if bearer_success else '✗ FAIL'}")
print(f" X-API-Key authentication: {'✓ PASS' if api_key_success else '✗ FAIL'}")
print("="*50)
else:
# Run mock tests to show the endpoints accept both header formats
test_mixed_endpoints()
print("\n" + "="*50)
print("Note: To run real tests, provide a valid token:")
print(f" python {sys.argv[0]} cp_your_token_here")
print("="*50)
if __name__ == "__main__":
main()

74
test_api_key_fix.sh Executable file
View File

@ -0,0 +1,74 @@
#!/bin/bash
# Test script to verify API key validation fix
# Configuration - adjust these values
# For remote testing, replace with your CyberPanel server URL
SERVER="${CYBERPANEL_SERVER:-http://localhost:8001}"
API_KEY="${CYBERPANEL_API_KEY:-cp_GrHf3ysP0SKhrEiazmqt3kRJA5KwOFQW8VJKcDQ8B5Bg}" # Your actual API key
SCAN_ID="${CYBERPANEL_SCAN_ID:-550e8400-e29b-41d4-a716-446655440000}" # A valid scan ID from your system
echo "Using server: $SERVER"
echo "Using API key: ${API_KEY:0:20}..."
echo "Using scan ID: $SCAN_ID"
echo ""
echo "=========================================="
echo "Testing CyberPanel API Key Validation Fix"
echo "=========================================="
echo ""
# Test 1: List API keys in the system
echo "1. Listing API keys in system..."
echo "---------------------------------"
curl -s "$SERVER/api/ai-scanner/list-api-keys/" | python3 -m json.tool
echo ""
# Test 2: Test authentication with X-API-Key header
echo "2. Testing X-API-Key authentication..."
echo "---------------------------------------"
curl -s -X POST "$SERVER/api/ai-scanner/test-auth/" \
-H "X-API-Key: $API_KEY" \
-H "X-Scan-ID: $SCAN_ID" \
-H "Content-Type: application/json" \
-d "{\"scan_id\": \"$SCAN_ID\"}" | python3 -m json.tool
echo ""
# Test 3: Test actual file operation with X-API-Key
echo "3. Testing file operation with X-API-Key..."
echo "--------------------------------------------"
RESPONSE=$(curl -s -w "\n%{http_code}" "$SERVER/api/scanner/get-file?file_path=wp-content/test.php" \
-H "X-API-Key: $API_KEY" \
-H "X-Scan-ID: $SCAN_ID")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | head -n-1)
echo "HTTP Status: $HTTP_CODE"
echo "Response body:"
echo "$BODY" | python3 -m json.tool 2>/dev/null || echo "$BODY"
echo ""
# Test 4: Test with Bearer token (backward compatibility)
echo "4. Testing Bearer token (backward compatibility)..."
echo "----------------------------------------------------"
RESPONSE=$(curl -s -w "\n%{http_code}" "$SERVER/api/scanner/get-file?file_path=wp-content/test.php" \
-H "Authorization: Bearer $API_KEY" \
-H "X-Scan-ID: $SCAN_ID")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | head -n-1)
echo "HTTP Status: $HTTP_CODE"
echo "Response body:"
echo "$BODY" | python3 -m json.tool 2>/dev/null || echo "$BODY"
echo ""
echo "=========================================="
echo "Test complete!"
echo ""
echo "Expected results:"
echo "- Test 1: Should show API keys in system"
echo "- Test 2: Should show validation success with detailed steps"
echo "- Test 3: Should return 200 or 404 (not 401)"
echo "- Test 4: Should also work with Bearer token"
echo "=========================================="

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,7 @@
$scope.isAdmin = $scope.debug.is_admin;
$scope.wpSitesCount = $scope.debug.wp_sites_count;
$scope.currentPage = 1;
$scope.recordsToShow = 10;
$scope.recordsToShow = 50; // Default to 50 items
$scope.expandedSites = {}; // Track which sites are expanded
$scope.currentWP = null; // Store current WordPress site for password protection
@ -792,11 +792,9 @@
<div style="flex: 1;">
<input ng-model="searchText" placeholder="Search by title or URL..." class="form-control">
</div>
<div style="width: 120px;">
<select ng-model="recordsToShow" class="form-control" ng-change="updatePagination()">
<option>10</option>
<option>50</option>
<option>100</option>
<div style="width: 150px;">
<select ng-model="recordsToShow" class="form-control" ng-change="updatePagination()"
ng-options="option.value as option.label for option in [{value: 10, label: '10 items'}, {value: 50, label: '50 items'}, {value: 100, label: '100 items'}]">
</select>
</div>
</div>

View File

@ -153,7 +153,25 @@ class WebsiteManager:
WPDelete = WPSites.objects.get(pk=DeleteID)
if ACLManager.checkOwnership(WPDelete.owner.domain, admin, currentACL) == 1:
WPDelete.delete()
# Check if this is a staging site (referenced by WPStaging as wpsite)
staging_records = WPStaging.objects.filter(wpsite=WPDelete)
if staging_records.exists():
# This is a staging site - perform complete cleanup
staging_website = WPDelete.owner
# Use the same robust deletion method as regular websites
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/virtualHostUtilities.py"
execPath = execPath + " deleteVirtualHostConfigurations --virtualHostName " + staging_website.domain
ProcessUtilities.popenExecutioner(execPath)
# Delete all staging records
staging_records.delete() # Delete WPStaging records
WPDelete.delete() # Delete WPSites record
staging_website.delete() # Delete Websites record
else:
# Regular WP site deletion
WPDelete.delete()
except BaseException as msg:
pass
@ -216,10 +234,28 @@ class WebsiteManager:
if DeleteID != None:
wstagingDelete = WPStaging.objects.get(pk=DeleteID, owner=WPobj)
# Get the associated staging WPSites and Websites records
staging_wpsite = wstagingDelete.wpsite
staging_website = staging_wpsite.owner
# Delete the staging Websites record and all associated data BEFORE deleting DB records
# Use the same robust deletion method as regular websites
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/virtualHostUtilities.py"
execPath = execPath + " deleteVirtualHostConfigurations --virtualHostName " + staging_website.domain
ProcessUtilities.popenExecutioner(execPath)
# Delete the WPStaging record
wstagingDelete.delete()
# Delete the staging WPSites record
staging_wpsite.delete()
# Delete the staging Websites record
staging_website.delete()
except BaseException as msg:
da = str(msg)
logging.CyberCPLogFileWriter.writeToFile(f"Error cleaning up WP/Staging sites: {str(msg)}")
proc = httpProc(request, 'websiteFunctions/WPsiteHome.html',
Data, 'createDatabase')