diff --git a/README.md b/README.md
index 11a6adece..6dcd87f83 100755
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
**Web Hosting Control Panel powered by OpenLiteSpeed**
Fast • Secure • Scalable — Simplify hosting management with style.
-**Version**: 2.5.5 • **Updated**: September 24, 2025
+**Version**: 2.5.5-dev • **Updated**: November 15, 2025
[](https://github.com/usmannasir/cyberpanel)
[](https://cyberpanel.net/KnowledgeBase/)
@@ -70,13 +70,13 @@ Fast • Secure • Scalable — Simplify hosting management with style.
| OS family | Recommended / Supported |
| -------------------------- | ----------------------: |
-| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended |
-| Debian 13, 12, 11 | ✅ Supported |
-| AlmaLinux 10, 9, 8 | ✅ Supported |
-| RockyLinux 9, 8 | ✅ Supported |
-| RHEL 9, 8 | ✅ Supported |
-| CloudLinux 9, 8 | ✅ Supported |
-| CentOS 7 | ⚠️ Legacy — EOL |
+| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended |
+| Debian 13, 12, 11 | ✅ Supported |
+| AlmaLinux 10, 9, 8 | ✅ Supported |
+| RockyLinux 9, 8 | ✅ Supported |
+| RHEL 9, 8 | ✅ Supported |
+| CloudLinux 9, 8 | ✅ Supported |
+| CentOS 7 | ⚠️ Legacy — EOL |
> CyberPanel targets x86\_64 only. Test the unsupported OS in staging first.
@@ -142,6 +142,12 @@ journalctl -u lscpd -f
---
+## Recent fixes
+
+* **15.11.2025** — Hardened MySQL password rotation: `mysqlUtilities.changePassword` now auto-resolves the backing MySQL account (user + host) even when `DBUsers` metadata is missing, preventing the historical `[mysqlUtilities.changePassword] can only concatenate str (not "int")` error. Regression tests live under `Test/mysqlUtilities/`, and you should restart `lscpd` after deploying the patch so the helper reloads.
+
+---
+
## Resources
* Official site: [https://cyberpanel.net](https://cyberpanel.net)
@@ -149,6 +155,7 @@ journalctl -u lscpd -f
* Community forum: [https://community.cyberpanel.net](https://community.cyberpanel.net)
* GitHub: [https://github.com/usmannasir/cyberpanel](https://github.com/usmannasir/cyberpanel)
* Guides folder: [guides](https://github.com/usmannasir/cyberpanel/blob/stable/guides/INDEX.md) (API, INSTALLATION, UPGRADE, TROUBLESHOOTING)
+
---
diff --git a/Test/databases/test_mysql_utilities.py b/Test/databases/test_mysql_utilities.py
deleted file mode 100644
index 449268aed..000000000
--- a/Test/databases/test_mysql_utilities.py
+++ /dev/null
@@ -1,103 +0,0 @@
-import sys
-import types
-import unittest
-from unittest import mock
-
-# Provide minimal stubs when running tests outside the target servers.
-if 'django' not in sys.modules:
- django_stub = types.ModuleType("django")
- django_stub.setup = lambda: None
- sys.modules['django'] = django_stub
-
-if 'MySQLdb' not in sys.modules:
- mysql_stub = types.ModuleType("MySQLdb")
- mysql_stub.connect = lambda *args, **kwargs: None
- cursors_stub = types.SimpleNamespace(SSCursor=object)
- mysql_stub.cursors = cursors_stub
- sys.modules['MySQLdb'] = mysql_stub
- sys.modules['MySQLdb.cursors'] = types.ModuleType("MySQLdb.cursors")
- sys.modules['MySQLdb.cursors'].SSCursor = object
-
-from plogical import mysqlUtilities
-
-
-class DummyConnection:
- def __init__(self, literal_side_effect=None):
- self.literal_calls = []
- self.literal_side_effect = literal_side_effect
- self.closed = False
-
- def literal(self, value):
- if self.literal_side_effect:
- raise self.literal_side_effect
- self.literal_calls.append(value)
- return f"'{value}'"
-
- def close(self):
- self.closed = True
-
-
-class DummyCursor:
- def __init__(self):
- self.executed = []
- self.fetchone_value = None
-
- def execute(self, query, params=None):
- self.executed.append((query, params))
-
- def fetchone(self):
- return self.fetchone_value
-
-
-class MysqlUtilitiesChangePasswordTests(unittest.TestCase):
- def test_numeric_password_is_coerced_and_sanitized(self):
- connection = DummyConnection()
- cursor = DummyCursor()
-
- with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
- mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
- mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False):
- result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 987654321)
-
- self.assertEqual(result, 1)
- self.assertIn('987654321', connection.literal_calls)
- self.assertEqual(len(cursor.executed), 2) # USE mysql + password update
- self.assertIn("PASSWORD('987654321')", cursor.executed[-1][0])
-
- def test_logs_error_when_literal_fails(self):
- connection = DummyConnection(literal_side_effect=ValueError("boom"))
- cursor = DummyCursor()
-
- with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
- mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
- mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False), \
- mock.patch('plogical.mysqlUtilities.logging.CyberCPLogFileWriter.writeToFile') as log_mock:
- result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 'secret')
-
- self.assertEqual(result, 0)
- log_calls = [call.args[0] for call in log_mock.call_args_list]
- self.assertTrue(any('[mysqlUtilities.changePassword.literal]' in msg for msg in log_calls))
-
-
-class MysqlUtilitiesResolveUserTests(unittest.TestCase):
- def test_resolves_username_when_orm_missing(self):
- class _StubManager:
- def get(self, *args, **kwargs):
- raise Exception("not found")
-
- dbusers_stub = types.SimpleNamespace(objects=_StubManager())
- databases_stub = types.SimpleNamespace(objects=_StubManager())
-
- cursor = DummyCursor()
- cursor.fetchone_value = None
-
- with mock.patch('plogical.mysqlUtilities.DBUsers', dbusers_stub, create=True), \
- mock.patch('plogical.mysqlUtilities.Databases', databases_stub, create=True):
- resolved = mysqlUtilities.mysqlUtilities.resolve_mysql_username('mystery_user', cursor)
-
- self.assertEqual(resolved, 'mystery_user')
-
-
-if __name__ == "__main__":
- unittest.main()
-
diff --git a/Test/install/check_imunifyav_route.py b/Test/install/check_imunifyav_route.py
deleted file mode 100644
index 5fefcdbd1..000000000
--- a/Test/install/check_imunifyav_route.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#!/usr/bin/env python3
-"""
-Lightweight regression test for the /imunifyav route.
-
-Authenticates by injecting a valid CyberPanel session (using the first Administrator record),
-requests both /imunifyav and /ImunifyAV, and records the HTTP status alongside the
-imunify-antivirus service state.
-"""
-import json
-import os
-import subprocess
-import sys
-from datetime import datetime
-from pathlib import Path
-
-ROOT_DIR = Path(__file__).resolve().parents[2]
-sys.path.append(str(ROOT_DIR))
-
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
-
-import django # noqa: E402
-django.setup()
-
-from django.test import Client # noqa: E402
-from loginSystem.models import Administrator # noqa: E402
-
-LOG_PATH = ROOT_DIR / "test_logs" / "imunifyav_route.log"
-LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
-
-
-def get_service_state():
- try:
- output = subprocess.check_output(
- ["systemctl", "is-active", "imunify-antivirus"],
- stderr=subprocess.STDOUT,
- timeout=10,
- text=True,
- ).strip()
- return output
- except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
- return "unavailable"
-
-
-def perform_request(client, path):
- try:
- response = client.get(path, follow=True)
- return response.status_code, None
- except Exception as exc: # pragma: no cover - diagnostic logging
- return None, str(exc)
-
-
-def build_client():
- admin = Administrator.objects.first()
- if not admin:
- return None, "No administrator records available to seed the session."
-
- client = Client()
- session = client.session
- session["userID"] = admin.pk
- session.save()
- return client, None
-
-
-def log_result(entry):
- entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
- with open(LOG_PATH, "a", encoding="utf-8") as handle:
- handle.write(json.dumps(entry, ensure_ascii=False) + "\n")
-
-
-def main():
- client, client_error = build_client()
- service_state = get_service_state()
- results = {
- "module": "imunifyav_route_check",
- "service_state": service_state,
- "retry": 0,
- "errors": [],
- "responses": {},
- }
-
- if client is None:
- results["errors"].append(client_error or "Unable to initialize Django test client.")
- log_result(results)
- print(json.dumps(results, indent=2))
- sys.exit(1)
-
- for path in ("/imunifyav/", "/ImunifyAV/"):
- status_code, error = perform_request(client, path)
- results["responses"][path] = status_code
- if error:
- results["errors"].append(f"{path}: {error}")
-
- log_result(results)
- print(json.dumps(results, indent=2))
-
- if results["errors"]:
- sys.exit(1)
-
-
-if __name__ == "__main__":
- main()
-
diff --git a/plogical/mysqlUtilities.py b/plogical/mysqlUtilities.py
index 82c087a42..92b568e27 100644
--- a/plogical/mysqlUtilities.py
+++ b/plogical/mysqlUtilities.py
@@ -916,13 +916,16 @@ password=%s
cursor.execute("use mysql")
- if host != None:
+ resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(userName, cursor)
+
+ LOCALHOST = mysqlUtilities.LOCALHOST
+
+ if host is not None:
LOCALHOST = host
- else:
- LOCALHOST = mysqlUtilities.LOCALHOST
+ elif resolved_host:
+ LOCALHOST = resolved_host
password_value = '' if dbPassword is None else str(dbPassword)
- resolved_user = mysqlUtilities.resolve_mysql_username(userName, cursor)
sanitized_user = mysqlUtilities._sanitize_mysql_identifier(resolved_user)
sanitized_host = mysqlUtilities._sanitize_mysql_identifier(LOCALHOST)
@@ -950,6 +953,10 @@ password=%s
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(query)
+ logging.CyberCPLogFileWriter.writeToFile(
+ "Resolved MySQL account %s@%s for identifier %s. [mysqlUtilities.changePassword]" % (
+ sanitized_user, sanitized_host, userName))
+
cursor.execute(query)
connection.close()
@@ -964,32 +971,33 @@ password=%s
def fetchuser(databaseName):
try:
connection, cursor = mysqlUtilities.setupConnection()
- cursor.execute("use mysql")
- database = Databases.objects.get(dbName=databaseName)
- databaseName = databaseName.replace('_', '\_')
- query = "select user from db where db = '%s'" % (databaseName)
if connection == 0:
return 0
- cursor.execute(query)
- rows = cursor.fetchall()
- counter = 0
+ cursor.execute("use mysql")
+ resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(databaseName, cursor)
- for row in rows:
- if row[0].find('_') > -1:
- database.dbUser = row[0]
- database.save()
+ database = Databases.objects.get(dbName=databaseName)
- try:
- connection.close()
- except:
- pass
- message = 'Detected databaser user is %s for database %s.' % (row[0], databaseName)
- logging.CyberCPLogFileWriter.writeToFile(message)
- return row[0]
- else:
- counter = counter + 1
+ if resolved_user and resolved_user.find('_') > -1:
+ database.dbUser = resolved_user
+ database.save()
+
+ host_message = resolved_host if resolved_host else mysqlUtilities.LOCALHOST
+ message = 'Detected database user %s@%s for database %s.' % (
+ resolved_user,
+ host_message,
+ databaseName
+ )
+ logging.CyberCPLogFileWriter.writeToFile(message)
+
+ try:
+ connection.close()
+ except:
+ pass
+
+ return resolved_user
connection.close()
@@ -1005,6 +1013,101 @@ password=%s
return ''
return str(value).replace("'", "''").strip()
+ @staticmethod
+ def _pick_host(host_candidates, fallback_host=None):
+ hosts = []
+ if host_candidates:
+ for host in host_candidates:
+ if host is None:
+ continue
+ host_value = str(host).strip()
+ if host_value:
+ hosts.append(host_value)
+
+ priority = []
+ if fallback_host:
+ priority.append(str(fallback_host).strip())
+ priority.extend([mysqlUtilities.LOCALHOST, 'localhost', '127.0.0.1'])
+
+ for candidate in priority:
+ if candidate and candidate in hosts:
+ return candidate
+
+ if '%' in hosts:
+ return '%'
+
+ if hosts:
+ return hosts[0]
+
+ if fallback_host:
+ return fallback_host
+
+ return mysqlUtilities.LOCALHOST
+
+ @staticmethod
+ def _resolve_mysql_account(identifier, cursor=None):
+ resolved_user = mysqlUtilities.resolve_mysql_username(identifier, cursor)
+ identifier_value = '' if identifier is None else str(identifier).strip()
+ host_candidates = []
+
+ try:
+ if DBUsers:
+ query = DBUsers.objects.filter(user=resolved_user)
+ for entry in query:
+ if getattr(entry, 'host', None):
+ host_candidates.append(entry.host)
+ except BaseException as msg:
+ logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.dbusers]' % (str(msg)))
+
+ def _query_mysql_db(column, value):
+ hosts = []
+ updated_user = resolved_user
+
+ if cursor is None or not value:
+ return hosts, updated_user
+
+ try:
+ query = "SELECT user, host FROM mysql.db WHERE {0} = %s".format(column)
+ cursor.execute(query, (value,))
+ rows = cursor.fetchall() or []
+
+ for row in rows:
+ user_value = None
+ host_value = None
+
+ if len(row) > 0:
+ user_value = row[0]
+
+ if len(row) > 1:
+ host_value = row[1]
+
+ if host_value:
+ if user_value:
+ updated_user = user_value
+ hosts.append(host_value)
+
+ return hosts, updated_user
+ except BaseException as msg:
+ logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.%s]' % (str(msg), column))
+ return hosts, updated_user
+
+ if not host_candidates:
+ hosts, resolved_user = _query_mysql_db('user', resolved_user)
+ host_candidates.extend(hosts)
+
+ if not host_candidates and identifier_value:
+ hosts, resolved_user = _query_mysql_db('db', identifier_value)
+ host_candidates.extend(hosts)
+
+ selected_host = mysqlUtilities._pick_host(host_candidates, mysqlUtilities.LOCALHOST)
+
+ if not host_candidates:
+ logging.CyberCPLogFileWriter.writeToFile(
+ 'Host resolution fallback in use for MySQL user %s (identifier: %s).'
+ ' [mysqlUtilities._resolve_mysql_account]' % (resolved_user, identifier_value if identifier_value else resolved_user))
+
+ return resolved_user, selected_host
+
@staticmethod
def resolve_mysql_username(identifier, cursor=None):
"""