Merge branch 'v2.4.4' into v2.4.4-dev

This commit is contained in:
usmannasir 2025-12-18 12:18:37 +05:00
commit 94bf2bdeef
3 changed files with 161 additions and 17 deletions

View File

@ -2,6 +2,7 @@
import os
import os.path
import sys
import re
from io import StringIO
import django
@ -784,9 +785,35 @@ class BackupManager:
except:
finalDic['user'] = "root"
# SECURITY: Validate all inputs to prevent command injection
if ACLManager.commandInjectionCheck(finalDic['ipAddress']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in IP address'}
return HttpResponse(json.dumps(final_dic))
if ACLManager.commandInjectionCheck(finalDic['password']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in password'}
return HttpResponse(json.dumps(final_dic))
if ACLManager.commandInjectionCheck(finalDic['port']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in port'}
return HttpResponse(json.dumps(final_dic))
if ACLManager.commandInjectionCheck(finalDic['user']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in username'}
return HttpResponse(json.dumps(final_dic))
# SECURITY: Validate port is numeric
try:
port_int = int(finalDic['port'])
if port_int < 1 or port_int > 65535:
raise ValueError("Port out of range")
except ValueError:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Port must be a valid number (1-65535)'}
return HttpResponse(json.dumps(final_dic))
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/backupUtilities.py"
execPath = execPath + " submitDestinationCreation --ipAddress " + finalDic['ipAddress'] + " --password " \
+ finalDic['password'] + " --port " + finalDic['port'] + ' --user %s' % (finalDic['user'])
execPath = execPath + " submitDestinationCreation --ipAddress " + shlex.quote(finalDic['ipAddress']) + " --password " \
+ shlex.quote(finalDic['password']) + " --port " + shlex.quote(finalDic['port']) + ' --user %s' % (shlex.quote(finalDic['user']))
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(execPath)
@ -880,8 +907,13 @@ class BackupManager:
ipAddress = data['IPAddress']
# SECURITY: Validate IP address to prevent command injection
if ACLManager.commandInjectionCheck(ipAddress) == 1:
final_dic = {'connStatus': 0, 'error_message': 'Invalid characters in IP address'}
return HttpResponse(json.dumps(final_dic))
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/backupUtilities.py"
execPath = execPath + " getConnectionStatus --ipAddress " + ipAddress
execPath = execPath + " getConnectionStatus --ipAddress " + shlex.quote(ipAddress)
output = ProcessUtilities.executioner(execPath)
@ -1342,16 +1374,32 @@ class BackupManager:
if ACLManager.currentContextPermission(currentACL, 'remoteBackups') == 0:
return ACLManager.loadErrorJson('remoteTransferStatus', 0)
backupDir = data['backupDir']
backupDir = str(data['backupDir'])
backupDirComplete = "/home/backup/transfer-" + str(backupDir)
# adminEmail = admin.email
# SECURITY: Validate backupDir to prevent command injection and path traversal
if ACLManager.commandInjectionCheck(backupDir) == 1:
data = {'remoteRestoreStatus': 0, 'error_message': 'Invalid characters in backup directory name'}
return HttpResponse(json.dumps(data))
##
# SECURITY: Ensure backupDir is alphanumeric only (backup dirs are typically numeric IDs)
if not re.match(r'^[a-zA-Z0-9_-]+$', backupDir):
data = {'remoteRestoreStatus': 0, 'error_message': 'Backup directory name must be alphanumeric'}
return HttpResponse(json.dumps(data))
# SECURITY: Prevent path traversal
if '..' in backupDir or '/' in backupDir:
data = {'remoteRestoreStatus': 0, 'error_message': 'Invalid backup directory path'}
return HttpResponse(json.dumps(data))
backupDirComplete = "/home/backup/transfer-" + backupDir
# SECURITY: Verify the backup directory exists
if not os.path.exists(backupDirComplete):
data = {'remoteRestoreStatus': 0, 'error_message': 'Backup directory does not exist'}
return HttpResponse(json.dumps(data))
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/remoteTransferUtilities.py"
execPath = execPath + " remoteBackupRestore --backupDirComplete " + backupDirComplete + " --backupDir " + str(
backupDir)
execPath = execPath + " remoteBackupRestore --backupDirComplete " + shlex.quote(backupDirComplete) + " --backupDir " + shlex.quote(backupDir)
ProcessUtilities.popenExecutioner(execPath)
@ -1373,16 +1421,35 @@ class BackupManager:
if ACLManager.currentContextPermission(currentACL, 'remoteBackups') == 0:
return ACLManager.loadErrorJson('remoteTransferStatus', 0)
backupDir = data['backupDir']
backupDir = str(data['backupDir'])
# SECURITY: Validate backupDir to prevent command injection and path traversal
if ACLManager.commandInjectionCheck(backupDir) == 1:
data = {'remoteTransferStatus': 0, 'error_message': 'Invalid characters in backup directory name', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
# SECURITY: Ensure backupDir is alphanumeric only
if not re.match(r'^[a-zA-Z0-9_-]+$', backupDir):
data = {'remoteTransferStatus': 0, 'error_message': 'Backup directory name must be alphanumeric', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
# SECURITY: Prevent path traversal
if '..' in backupDir or '/' in backupDir:
data = {'remoteTransferStatus': 0, 'error_message': 'Invalid backup directory path', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
# admin = Administrator.objects.get(userName=username)
backupLogPath = "/home/backup/transfer-" + backupDir + "/" + "backup_log"
removalPath = "/home/backup/transfer-" + backupDir
removalPath = "/home/backup/transfer-" + str(backupDir)
# SECURITY: Verify the backup directory exists before operating on it
if not os.path.exists(removalPath):
data = {'remoteTransferStatus': 0, 'error_message': 'Backup directory does not exist', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
time.sleep(3)
command = "sudo cat " + backupLogPath
command = "sudo cat " + shlex.quote(backupLogPath)
status = ProcessUtilities.outputExecutioner(command)
@ -1393,14 +1460,14 @@ class BackupManager:
if status.find("completed[success]") > -1:
command = "rm -rf " + removalPath
command = "rm -rf " + shlex.quote(removalPath)
ProcessUtilities.executioner(command)
data_ret = {'remoteTransferStatus': 1, 'error_message': "None", "status": status, "complete": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
elif status.find("[5010]") > -1:
command = "sudo rm -rf " + removalPath
command = "sudo rm -rf " + shlex.quote(removalPath)
ProcessUtilities.executioner(command)
data = {'remoteTransferStatus': 0, 'error_message': status,
"status": "None", "complete": 0}

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import os
from django.shortcuts import render,redirect
from loginSystem.models import Administrator
from loginSystem.views import loadLoginPage
@ -326,6 +327,23 @@ def downloadFile(request):
if fileToDownload.find('..') > -1 or fileToDownload.find(homePath) == -1:
return HttpResponse("Unauthorized access.")
# SECURITY: Check for symlink attacks - resolve the real path and verify it stays within homePath
try:
realPath = os.path.realpath(fileToDownload)
# Verify the resolved path is still within the user's home directory
if not realPath.startswith(homePath + '/') and realPath != homePath:
logging.CyberCPLogFileWriter.writeToFile(
f"Symlink attack blocked: {fileToDownload} -> {realPath} (outside {homePath})")
return HttpResponse("Unauthorized access: Symlink points outside allowed directory.")
# Verify it's a regular file
if not os.path.isfile(realPath):
return HttpResponse("Unauthorized access: Not a valid file.")
except OSError as e:
return HttpResponse("Unauthorized access: Cannot verify file path.")
response = HttpResponse(content_type='application/force-download')
response['Content-Disposition'] = 'attachment; filename=%s' % (fileToDownload.split('/')[-1])
response['X-LiteSpeed-Location'] = '%s' % (fileToDownload)
@ -351,6 +369,37 @@ def RootDownloadFile(request):
else:
return ACLManager.loadError()
# SECURITY: Prevent path traversal attacks
if fileToDownload.find('..') > -1:
return HttpResponse("Unauthorized access: Path traversal detected.")
# SECURITY: Check for symlink attacks - resolve the real path and verify it's safe
try:
# Get the real path (resolves symlinks)
realPath = os.path.realpath(fileToDownload)
# SECURITY: Prevent access to sensitive system files
sensitive_paths = ['/etc/shadow', '/etc/passwd', '/etc/sudoers', '/root/.ssh',
'/var/log', '/proc', '/sys', '/dev']
for sensitive in sensitive_paths:
if realPath.startswith(sensitive):
return HttpResponse("Unauthorized access: Access to system files denied.")
# SECURITY: Verify the file exists and is a regular file (not a directory or device)
if not os.path.isfile(realPath):
return HttpResponse("Unauthorized access: Not a valid file.")
# SECURITY: Check if the original path differs from real path (symlink detection)
# Allow the download only if the real path is within allowed directories
# For admin, we'll be more permissive but still block sensitive system files
if fileToDownload != realPath:
# This is a symlink - log it and verify destination is safe
logging.CyberCPLogFileWriter.writeToFile(
f"Symlink download detected: {fileToDownload} -> {realPath}")
except OSError as e:
return HttpResponse("Unauthorized access: Cannot verify file path.")
response = HttpResponse(content_type='application/force-download')
response['Content-Disposition'] = 'attachment; filename=%s' % (fileToDownload.split('/')[-1])
response['X-LiteSpeed-Location'] = '%s' % (fileToDownload)

View File

@ -1,5 +1,6 @@
from plogical import CyberCPLogFileWriter as logging
import os
import re
import requests
import json
import time
@ -9,6 +10,7 @@ import shlex
from multiprocessing import Process
from plogical.backupSchedule import backupSchedule
from shutil import rmtree
from plogical.acl import ACLManager
class remoteBackup:
@ -216,16 +218,42 @@ class remoteBackup:
@staticmethod
def sendBackup(completedPathToSend, IPAddress, folderNumber,writeToFile):
def sendBackup(completedPathToSend, IPAddress, folderNumber, writeToFile):
try:
## complete path is a path to the file need to send
command = 'sudo rsync -avz -e "ssh -i /root/.ssh/cyberpanel -o StrictHostKeyChecking=no" ' + completedPathToSend + ' root@' + IPAddress + ':/home/backup/transfer-'+folderNumber
# SECURITY: Validate IPAddress to prevent command injection
if ACLManager.commandInjectionCheck(IPAddress) == 1:
logging.CyberCPLogFileWriter.writeToFile("Invalid IP address - command injection attempt detected [sendBackup]")
return
# SECURITY: Validate IPAddress format (IPv4 or hostname)
ip_pattern = r'^[a-zA-Z0-9][a-zA-Z0-9.-]*[a-zA-Z0-9]$|^[a-zA-Z0-9]$'
if not re.match(ip_pattern, IPAddress):
logging.CyberCPLogFileWriter.writeToFile("Invalid IP address format [sendBackup]")
return
# SECURITY: Validate folderNumber is alphanumeric
if ACLManager.commandInjectionCheck(str(folderNumber)) == 1:
logging.CyberCPLogFileWriter.writeToFile("Invalid folder number - command injection attempt detected [sendBackup]")
return
if not re.match(r'^[a-zA-Z0-9_-]+$', str(folderNumber)):
logging.CyberCPLogFileWriter.writeToFile("Invalid folder number format [sendBackup]")
return
# SECURITY: Validate completedPathToSend - must be under /home/backup
if '..' in completedPathToSend or not completedPathToSend.startswith('/home/backup/'):
logging.CyberCPLogFileWriter.writeToFile("Invalid backup path - path traversal attempt detected [sendBackup]")
return
# SECURITY: Use shlex.quote for all user-controllable parameters
command = 'sudo rsync -avz -e "ssh -i /root/.ssh/cyberpanel -o StrictHostKeyChecking=no" ' + shlex.quote(completedPathToSend) + ' root@' + shlex.quote(IPAddress) + ':/home/backup/transfer-' + shlex.quote(str(folderNumber))
subprocess.call(shlex.split(command), stdout=writeToFile)
os.remove(completedPathToSend)
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [startBackup]")
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [sendBackup]")
@staticmethod
def backupProcess(ipAddress, dir, backupLogPath,folderNumber, accountsToTransfer):