diff --git a/README.md b/README.md index 11a6adece..6dcd87f83 100755 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ **Web Hosting Control Panel powered by OpenLiteSpeed** Fast • Secure • Scalable — Simplify hosting management with style. -**Version**: 2.5.5 • **Updated**: September 24, 2025 +**Version**: 2.5.5-dev • **Updated**: November 15, 2025 [![GitHub](https://img.shields.io/badge/GitHub-Repo-000?style=flat-square\&logo=github)](https://github.com/usmannasir/cyberpanel) [![Docs](https://img.shields.io/badge/Docs-Read-green?style=flat-square\&logo=gitbook)](https://cyberpanel.net/KnowledgeBase/) @@ -70,13 +70,13 @@ Fast • Secure • Scalable — Simplify hosting management with style. | OS family | Recommended / Supported | | -------------------------- | ----------------------: | -| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended | -| Debian 13, 12, 11 | ✅ Supported | -| AlmaLinux 10, 9, 8 | ✅ Supported | -| RockyLinux 9, 8 | ✅ Supported | -| RHEL 9, 8 | ✅ Supported | -| CloudLinux 9, 8 | ✅ Supported | -| CentOS 7 | ⚠️ Legacy — EOL | +| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended | +| Debian 13, 12, 11 | ✅ Supported | +| AlmaLinux 10, 9, 8 | ✅ Supported | +| RockyLinux 9, 8 | ✅ Supported | +| RHEL 9, 8 | ✅ Supported | +| CloudLinux 9, 8 | ✅ Supported | +| CentOS 7 | ⚠️ Legacy — EOL | > CyberPanel targets x86\_64 only. Test the unsupported OS in staging first. @@ -142,6 +142,12 @@ journalctl -u lscpd -f --- +## Recent fixes + +* **15.11.2025** — Hardened MySQL password rotation: `mysqlUtilities.changePassword` now auto-resolves the backing MySQL account (user + host) even when `DBUsers` metadata is missing, preventing the historical `[mysqlUtilities.changePassword] can only concatenate str (not "int")` error. Regression tests live under `Test/mysqlUtilities/`, and you should restart `lscpd` after deploying the patch so the helper reloads. + +--- + ## Resources * Official site: [https://cyberpanel.net](https://cyberpanel.net) @@ -149,6 +155,7 @@ journalctl -u lscpd -f * Community forum: [https://community.cyberpanel.net](https://community.cyberpanel.net) * GitHub: [https://github.com/usmannasir/cyberpanel](https://github.com/usmannasir/cyberpanel) * Guides folder: [guides](https://github.com/usmannasir/cyberpanel/blob/stable/guides/INDEX.md) (API, INSTALLATION, UPGRADE, TROUBLESHOOTING) + ---
diff --git a/Test/databases/test_mysql_utilities.py b/Test/databases/test_mysql_utilities.py deleted file mode 100644 index 449268aed..000000000 --- a/Test/databases/test_mysql_utilities.py +++ /dev/null @@ -1,103 +0,0 @@ -import sys -import types -import unittest -from unittest import mock - -# Provide minimal stubs when running tests outside the target servers. -if 'django' not in sys.modules: - django_stub = types.ModuleType("django") - django_stub.setup = lambda: None - sys.modules['django'] = django_stub - -if 'MySQLdb' not in sys.modules: - mysql_stub = types.ModuleType("MySQLdb") - mysql_stub.connect = lambda *args, **kwargs: None - cursors_stub = types.SimpleNamespace(SSCursor=object) - mysql_stub.cursors = cursors_stub - sys.modules['MySQLdb'] = mysql_stub - sys.modules['MySQLdb.cursors'] = types.ModuleType("MySQLdb.cursors") - sys.modules['MySQLdb.cursors'].SSCursor = object - -from plogical import mysqlUtilities - - -class DummyConnection: - def __init__(self, literal_side_effect=None): - self.literal_calls = [] - self.literal_side_effect = literal_side_effect - self.closed = False - - def literal(self, value): - if self.literal_side_effect: - raise self.literal_side_effect - self.literal_calls.append(value) - return f"'{value}'" - - def close(self): - self.closed = True - - -class DummyCursor: - def __init__(self): - self.executed = [] - self.fetchone_value = None - - def execute(self, query, params=None): - self.executed.append((query, params)) - - def fetchone(self): - return self.fetchone_value - - -class MysqlUtilitiesChangePasswordTests(unittest.TestCase): - def test_numeric_password_is_coerced_and_sanitized(self): - connection = DummyConnection() - cursor = DummyCursor() - - with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \ - mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \ - mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False): - result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 987654321) - - self.assertEqual(result, 1) - self.assertIn('987654321', connection.literal_calls) - self.assertEqual(len(cursor.executed), 2) # USE mysql + password update - self.assertIn("PASSWORD('987654321')", cursor.executed[-1][0]) - - def test_logs_error_when_literal_fails(self): - connection = DummyConnection(literal_side_effect=ValueError("boom")) - cursor = DummyCursor() - - with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \ - mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \ - mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False), \ - mock.patch('plogical.mysqlUtilities.logging.CyberCPLogFileWriter.writeToFile') as log_mock: - result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 'secret') - - self.assertEqual(result, 0) - log_calls = [call.args[0] for call in log_mock.call_args_list] - self.assertTrue(any('[mysqlUtilities.changePassword.literal]' in msg for msg in log_calls)) - - -class MysqlUtilitiesResolveUserTests(unittest.TestCase): - def test_resolves_username_when_orm_missing(self): - class _StubManager: - def get(self, *args, **kwargs): - raise Exception("not found") - - dbusers_stub = types.SimpleNamespace(objects=_StubManager()) - databases_stub = types.SimpleNamespace(objects=_StubManager()) - - cursor = DummyCursor() - cursor.fetchone_value = None - - with mock.patch('plogical.mysqlUtilities.DBUsers', dbusers_stub, create=True), \ - mock.patch('plogical.mysqlUtilities.Databases', databases_stub, create=True): - resolved = mysqlUtilities.mysqlUtilities.resolve_mysql_username('mystery_user', cursor) - - self.assertEqual(resolved, 'mystery_user') - - -if __name__ == "__main__": - unittest.main() - diff --git a/Test/install/check_imunifyav_route.py b/Test/install/check_imunifyav_route.py deleted file mode 100644 index 5fefcdbd1..000000000 --- a/Test/install/check_imunifyav_route.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python3 -""" -Lightweight regression test for the /imunifyav route. - -Authenticates by injecting a valid CyberPanel session (using the first Administrator record), -requests both /imunifyav and /ImunifyAV, and records the HTTP status alongside the -imunify-antivirus service state. -""" -import json -import os -import subprocess -import sys -from datetime import datetime -from pathlib import Path - -ROOT_DIR = Path(__file__).resolve().parents[2] -sys.path.append(str(ROOT_DIR)) - -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings") - -import django # noqa: E402 -django.setup() - -from django.test import Client # noqa: E402 -from loginSystem.models import Administrator # noqa: E402 - -LOG_PATH = ROOT_DIR / "test_logs" / "imunifyav_route.log" -LOG_PATH.parent.mkdir(parents=True, exist_ok=True) - - -def get_service_state(): - try: - output = subprocess.check_output( - ["systemctl", "is-active", "imunify-antivirus"], - stderr=subprocess.STDOUT, - timeout=10, - text=True, - ).strip() - return output - except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): - return "unavailable" - - -def perform_request(client, path): - try: - response = client.get(path, follow=True) - return response.status_code, None - except Exception as exc: # pragma: no cover - diagnostic logging - return None, str(exc) - - -def build_client(): - admin = Administrator.objects.first() - if not admin: - return None, "No administrator records available to seed the session." - - client = Client() - session = client.session - session["userID"] = admin.pk - session.save() - return client, None - - -def log_result(entry): - entry["timestamp"] = datetime.utcnow().isoformat() + "Z" - with open(LOG_PATH, "a", encoding="utf-8") as handle: - handle.write(json.dumps(entry, ensure_ascii=False) + "\n") - - -def main(): - client, client_error = build_client() - service_state = get_service_state() - results = { - "module": "imunifyav_route_check", - "service_state": service_state, - "retry": 0, - "errors": [], - "responses": {}, - } - - if client is None: - results["errors"].append(client_error or "Unable to initialize Django test client.") - log_result(results) - print(json.dumps(results, indent=2)) - sys.exit(1) - - for path in ("/imunifyav/", "/ImunifyAV/"): - status_code, error = perform_request(client, path) - results["responses"][path] = status_code - if error: - results["errors"].append(f"{path}: {error}") - - log_result(results) - print(json.dumps(results, indent=2)) - - if results["errors"]: - sys.exit(1) - - -if __name__ == "__main__": - main() - diff --git a/plogical/mysqlUtilities.py b/plogical/mysqlUtilities.py index 82c087a42..92b568e27 100644 --- a/plogical/mysqlUtilities.py +++ b/plogical/mysqlUtilities.py @@ -916,13 +916,16 @@ password=%s cursor.execute("use mysql") - if host != None: + resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(userName, cursor) + + LOCALHOST = mysqlUtilities.LOCALHOST + + if host is not None: LOCALHOST = host - else: - LOCALHOST = mysqlUtilities.LOCALHOST + elif resolved_host: + LOCALHOST = resolved_host password_value = '' if dbPassword is None else str(dbPassword) - resolved_user = mysqlUtilities.resolve_mysql_username(userName, cursor) sanitized_user = mysqlUtilities._sanitize_mysql_identifier(resolved_user) sanitized_host = mysqlUtilities._sanitize_mysql_identifier(LOCALHOST) @@ -950,6 +953,10 @@ password=%s if os.path.exists(ProcessUtilities.debugPath): logging.CyberCPLogFileWriter.writeToFile(query) + logging.CyberCPLogFileWriter.writeToFile( + "Resolved MySQL account %s@%s for identifier %s. [mysqlUtilities.changePassword]" % ( + sanitized_user, sanitized_host, userName)) + cursor.execute(query) connection.close() @@ -964,32 +971,33 @@ password=%s def fetchuser(databaseName): try: connection, cursor = mysqlUtilities.setupConnection() - cursor.execute("use mysql") - database = Databases.objects.get(dbName=databaseName) - databaseName = databaseName.replace('_', '\_') - query = "select user from db where db = '%s'" % (databaseName) if connection == 0: return 0 - cursor.execute(query) - rows = cursor.fetchall() - counter = 0 + cursor.execute("use mysql") + resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(databaseName, cursor) - for row in rows: - if row[0].find('_') > -1: - database.dbUser = row[0] - database.save() + database = Databases.objects.get(dbName=databaseName) - try: - connection.close() - except: - pass - message = 'Detected databaser user is %s for database %s.' % (row[0], databaseName) - logging.CyberCPLogFileWriter.writeToFile(message) - return row[0] - else: - counter = counter + 1 + if resolved_user and resolved_user.find('_') > -1: + database.dbUser = resolved_user + database.save() + + host_message = resolved_host if resolved_host else mysqlUtilities.LOCALHOST + message = 'Detected database user %s@%s for database %s.' % ( + resolved_user, + host_message, + databaseName + ) + logging.CyberCPLogFileWriter.writeToFile(message) + + try: + connection.close() + except: + pass + + return resolved_user connection.close() @@ -1005,6 +1013,101 @@ password=%s return '' return str(value).replace("'", "''").strip() + @staticmethod + def _pick_host(host_candidates, fallback_host=None): + hosts = [] + if host_candidates: + for host in host_candidates: + if host is None: + continue + host_value = str(host).strip() + if host_value: + hosts.append(host_value) + + priority = [] + if fallback_host: + priority.append(str(fallback_host).strip()) + priority.extend([mysqlUtilities.LOCALHOST, 'localhost', '127.0.0.1']) + + for candidate in priority: + if candidate and candidate in hosts: + return candidate + + if '%' in hosts: + return '%' + + if hosts: + return hosts[0] + + if fallback_host: + return fallback_host + + return mysqlUtilities.LOCALHOST + + @staticmethod + def _resolve_mysql_account(identifier, cursor=None): + resolved_user = mysqlUtilities.resolve_mysql_username(identifier, cursor) + identifier_value = '' if identifier is None else str(identifier).strip() + host_candidates = [] + + try: + if DBUsers: + query = DBUsers.objects.filter(user=resolved_user) + for entry in query: + if getattr(entry, 'host', None): + host_candidates.append(entry.host) + except BaseException as msg: + logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.dbusers]' % (str(msg))) + + def _query_mysql_db(column, value): + hosts = [] + updated_user = resolved_user + + if cursor is None or not value: + return hosts, updated_user + + try: + query = "SELECT user, host FROM mysql.db WHERE {0} = %s".format(column) + cursor.execute(query, (value,)) + rows = cursor.fetchall() or [] + + for row in rows: + user_value = None + host_value = None + + if len(row) > 0: + user_value = row[0] + + if len(row) > 1: + host_value = row[1] + + if host_value: + if user_value: + updated_user = user_value + hosts.append(host_value) + + return hosts, updated_user + except BaseException as msg: + logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.%s]' % (str(msg), column)) + return hosts, updated_user + + if not host_candidates: + hosts, resolved_user = _query_mysql_db('user', resolved_user) + host_candidates.extend(hosts) + + if not host_candidates and identifier_value: + hosts, resolved_user = _query_mysql_db('db', identifier_value) + host_candidates.extend(hosts) + + selected_host = mysqlUtilities._pick_host(host_candidates, mysqlUtilities.LOCALHOST) + + if not host_candidates: + logging.CyberCPLogFileWriter.writeToFile( + 'Host resolution fallback in use for MySQL user %s (identifier: %s).' + ' [mysqlUtilities._resolve_mysql_account]' % (resolved_user, identifier_value if identifier_value else resolved_user)) + + return resolved_user, selected_host + @staticmethod def resolve_mysql_username(identifier, cursor=None): """