diff --git a/aiScanner/api.py b/aiScanner/api.py index 42a2c3405..083b6a1da 100644 --- a/aiScanner/api.py +++ b/aiScanner/api.py @@ -29,6 +29,27 @@ class AuthWrapper: self.source_obj = source_obj # Original FileAccessToken or AIScannerSettings object +def extract_auth_token(request): + """ + Extract authentication token from either Bearer or X-API-Key header + + Returns: (token, auth_type) where auth_type is 'bearer' or 'api_key' + """ + # Check for X-API-Key header first (preferred for permanent auth) + api_key_header = request.META.get('HTTP_X_API_KEY', '') + if api_key_header: + logging.writeToFile(f'[API] Using X-API-Key authentication') + return api_key_header, 'api_key' + + # Check for Bearer token (backward compatibility) + auth_header = request.META.get('HTTP_AUTHORIZATION', '') + if auth_header.startswith('Bearer '): + logging.writeToFile(f'[API] Using Bearer token authentication') + return auth_header.replace('Bearer ', ''), 'bearer' + + return None, None + + def validate_access_token(token, scan_id): """ Validate authentication token - accepts BOTH file access tokens and API keys @@ -309,14 +330,13 @@ def list_files(request): } """ try: - # Validate authorization - auth_header = request.META.get('HTTP_AUTHORIZATION', '') - if not auth_header.startswith('Bearer '): - return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401) - - access_token = auth_header.replace('Bearer ', '') + # Validate authorization (supports both Bearer token and X-API-Key) + access_token, auth_type = extract_auth_token(request) + if not access_token: + return JsonResponse({'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401) + scan_id = request.META.get('HTTP_X_SCAN_ID', '') - + if not scan_id: return JsonResponse({'error': 'X-Scan-ID header required'}, status=400) @@ -436,14 +456,13 @@ def get_file_content(request): } """ try: - # Validate authorization - auth_header = request.META.get('HTTP_AUTHORIZATION', '') - if not auth_header.startswith('Bearer '): - return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401) - - access_token = auth_header.replace('Bearer ', '') + # Validate authorization (supports both Bearer token and X-API-Key) + access_token, auth_type = extract_auth_token(request) + if not access_token: + return JsonResponse({'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401) + scan_id = request.META.get('HTTP_X_SCAN_ID', '') - + if not scan_id: return JsonResponse({'error': 'X-Scan-ID header required'}, status=400) @@ -855,12 +874,11 @@ def scanner_backup_file(request): file_path = data.get('file_path', '').strip('/') scan_id = data.get('scan_id', '') - # Validate authorization - auth_header = request.META.get('HTTP_AUTHORIZATION', '') - if not auth_header.startswith('Bearer '): - return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401) + # Validate authorization (supports both Bearer token and X-API-Key) + access_token, auth_type = extract_auth_token(request) + if not access_token: + return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401) - access_token = auth_header.replace('Bearer ', '') header_scan_id = request.META.get('HTTP_X_SCAN_ID', '') if not scan_id or not header_scan_id or scan_id != header_scan_id: @@ -997,12 +1015,11 @@ def scanner_get_file(request): } """ try: - # Validate authorization - auth_header = request.META.get('HTTP_AUTHORIZATION', '') - if not auth_header.startswith('Bearer '): - return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401) + # Validate authorization (supports both Bearer token and X-API-Key) + access_token, auth_type = extract_auth_token(request) + if not access_token: + return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401) - access_token = auth_header.replace('Bearer ', '') scan_id = request.META.get('HTTP_X_SCAN_ID', '') if not scan_id: @@ -1177,12 +1194,11 @@ def scanner_replace_file(request): backup_before_replace = data.get('backup_before_replace', True) verify_hash = data.get('verify_hash', '') - # Validate authorization - auth_header = request.META.get('HTTP_AUTHORIZATION', '') - if not auth_header.startswith('Bearer '): - return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401) + # Validate authorization (supports both Bearer token and X-API-Key) + access_token, auth_type = extract_auth_token(request) + if not access_token: + return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401) - access_token = auth_header.replace('Bearer ', '') scan_id = request.META.get('HTTP_X_SCAN_ID', '') if not scan_id: @@ -1386,12 +1402,11 @@ def scanner_rename_file(request): new_path = data.get('new_path', '').strip('/') backup_before_rename = data.get('backup_before_rename', True) - # Validate authorization - auth_header = request.META.get('HTTP_AUTHORIZATION', '') - if not auth_header.startswith('Bearer '): - return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401) + # Validate authorization (supports both Bearer token and X-API-Key) + access_token, auth_type = extract_auth_token(request) + if not access_token: + return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401) - access_token = auth_header.replace('Bearer ', '') scan_id = request.META.get('HTTP_X_SCAN_ID', '') if not scan_id: @@ -1560,12 +1575,11 @@ def scanner_delete_file(request): 'message': 'Set confirm_deletion: true to proceed' }, status=400) - # Validate authorization - auth_header = request.META.get('HTTP_AUTHORIZATION', '') - if not auth_header.startswith('Bearer '): - return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401) + # Validate authorization (supports both Bearer token and X-API-Key) + access_token, auth_type = extract_auth_token(request) + if not access_token: + return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401) - access_token = auth_header.replace('Bearer ', '') scan_id = request.META.get('HTTP_X_SCAN_ID', '') if not scan_id: diff --git a/test_api_auth.py b/test_api_auth.py new file mode 100644 index 000000000..98d8ac2b3 --- /dev/null +++ b/test_api_auth.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +""" +Test script to verify both Bearer token and X-API-Key authentication work +for CyberPanel AI Scanner file operations. +""" + +import requests +import json +import sys + +# Test configuration +BASE_URL = "http://localhost:8001" # Adjust if needed +SCAN_ID = "test-scan-123" +FILE_PATH = "wp-content/plugins/test.php" + +def test_bearer_auth(token): + """Test with Bearer token authentication""" + print("Testing Bearer token authentication...") + + headers = { + "Authorization": f"Bearer {token}", + "X-Scan-ID": SCAN_ID, + "Content-Type": "application/json" + } + + # Test get-file endpoint + url = f"{BASE_URL}/api/scanner/get-file" + params = {"file_path": FILE_PATH} + + response = requests.get(url, params=params, headers=headers) + print(f"Bearer auth response: {response.status_code}") + if response.status_code != 200: + print(f"Response: {response.text}") + return response.status_code == 200 or response.status_code == 404 # 404 is ok if file doesn't exist + + +def test_api_key_auth(api_key): + """Test with X-API-Key authentication""" + print("\nTesting X-API-Key authentication...") + + headers = { + "X-API-Key": api_key, + "X-Scan-ID": SCAN_ID, + "Content-Type": "application/json" + } + + # Test get-file endpoint + url = f"{BASE_URL}/api/scanner/get-file" + params = {"file_path": FILE_PATH} + + response = requests.get(url, params=params, headers=headers) + print(f"X-API-Key auth response: {response.status_code}") + if response.status_code != 200: + print(f"Response: {response.text}") + return response.status_code == 200 or response.status_code == 404 # 404 is ok if file doesn't exist + + +def test_mixed_endpoints(): + """Test different endpoints with both authentication methods""" + print("\n" + "="*50) + print("Testing multiple endpoints with both auth methods") + print("="*50) + + # You would need real tokens for this to work + test_token = "cp_test_token_12345" + test_api_key = "cp_test_api_key_67890" + + endpoints = [ + ("GET", "/api/ai-scanner/files/list", {"path": "wp-content"}), + ("GET", "/api/ai-scanner/files/content", {"path": FILE_PATH}), + ("GET", "/api/scanner/get-file", {"file_path": FILE_PATH}), + ] + + for method, endpoint, params in endpoints: + print(f"\nTesting {method} {endpoint}") + + # Test with Bearer token + headers_bearer = { + "Authorization": f"Bearer {test_token}", + "X-Scan-ID": SCAN_ID + } + + # Test with X-API-Key + headers_api_key = { + "X-API-Key": test_api_key, + "X-Scan-ID": SCAN_ID + } + + url = f"{BASE_URL}{endpoint}" + + # Make requests (will fail without valid tokens, but shows the headers work) + if method == "GET": + response_bearer = requests.get(url, params=params, headers=headers_bearer) + response_api_key = requests.get(url, params=params, headers=headers_api_key) + + print(f" Bearer auth: {response_bearer.status_code}") + print(f" X-API-Key auth: {response_api_key.status_code}") + + +def main(): + """Main test function""" + print("CyberPanel AI Scanner Authentication Test") + print("="*50) + + if len(sys.argv) > 1: + # If token provided as argument, use it + token = sys.argv[1] + + # Test both authentication methods with the same token + # (assumes token is valid for both methods) + bearer_success = test_bearer_auth(token) + api_key_success = test_api_key_auth(token) + + print("\n" + "="*50) + print("Test Results:") + print(f" Bearer authentication: {'✓ PASS' if bearer_success else '✗ FAIL'}") + print(f" X-API-Key authentication: {'✓ PASS' if api_key_success else '✗ FAIL'}") + print("="*50) + else: + # Run mock tests to show the endpoints accept both header formats + test_mixed_endpoints() + + print("\n" + "="*50) + print("Note: To run real tests, provide a valid token:") + print(f" python {sys.argv[0]} cp_your_token_here") + print("="*50) + + +if __name__ == "__main__": + main() \ No newline at end of file