diff --git a/aiScanner/api.py b/aiScanner/api.py index 083b6a1da..fceddf53b 100644 --- a/aiScanner/api.py +++ b/aiScanner/api.py @@ -99,25 +99,30 @@ def validate_access_token(token, scan_id): logging.writeToFile(f'[API] File token not found for scan {scan_id}, trying API key fallback...') # Fall through to try API key - # OPTION 2: Try API Key (for post-scan file operations) + # OPTION 2: Try CyberPanel's own API Key (for post-scan file operations from platform) + # The platform sends back the same API key that CyberPanel used to submit the scan try: from .models import AIScannerSettings, ScanHistory # Debug: log the token being checked logging.writeToFile(f'[API] Checking API key: {token[:20]}... for scan {scan_id}') - # Find API key in settings - scanner_settings = AIScannerSettings.objects.get( + # First, check if this is a valid CyberPanel API key (any admin's key) + scanner_settings = AIScannerSettings.objects.filter( api_key=token - ) + ).first() + + if not scanner_settings: + logging.writeToFile(f'[API] API key not found in settings') + return None, "Invalid token" logging.writeToFile(f'[API] Found API key for admin: {scanner_settings.admin.userName}') - # Get the scan to verify it belongs to this admin and get domain/path + # Get the scan - don't require it to belong to the same admin + # (platform may be using any valid CyberPanel API key for file operations) try: scan = ScanHistory.objects.get( - scan_id=scan_id, - admin=scanner_settings.admin + scan_id=scan_id ) # Get wp_path from WPSites (WordPress installations) @@ -152,12 +157,55 @@ def validate_access_token(token, scan_id): return None, "WordPress site not found" except ScanHistory.DoesNotExist: - logging.writeToFile(f'[API] Scan {scan_id} not found or does not belong to API key owner') - return None, "Scan not found or access denied" + logging.writeToFile(f'[API] Scan {scan_id} not found') + return None, "Scan not found" - except AIScannerSettings.DoesNotExist: - logging.writeToFile(f'[API] API key not found in settings') - return None, "Invalid token" + except Exception as e: + logging.writeToFile(f'[API] API key validation error: {str(e)}') + pass # Fall through to OPTION 3 + + # OPTION 3: Simple validation for platform callbacks + # If we have a valid CyberPanel API key and a valid scan, allow access + # This handles cases where the platform is using the API key to fix files + try: + from .models import AIScannerSettings, ScanHistory + + # Check if ANY admin has this API key (less restrictive for platform callbacks) + has_valid_key = AIScannerSettings.objects.filter(api_key=token).exists() + + if has_valid_key: + # Check if the scan exists (any admin's scan) + try: + scan = ScanHistory.objects.get(scan_id=scan_id) + + # Get WordPress site info + from websiteFunctions.models import WPSites + wp_site = WPSites.objects.filter( + FinalURL__icontains=scan.domain + ).first() + + if wp_site: + logging.writeToFile(f'[API] Platform callback validated: API key exists, scan {scan_id} found') + return AuthWrapper( + domain=scan.domain, + wp_path=wp_site.path, + auth_type='api_key', + external_app=wp_site.owner.externalApp, + source_obj=None + ), None + else: + logging.writeToFile(f'[API] WordPress site not found for scan {scan_id}') + return None, "WordPress site not found" + + except ScanHistory.DoesNotExist: + logging.writeToFile(f'[API] Scan {scan_id} not found in OPTION 3') + return None, "Scan not found" + else: + logging.writeToFile(f'[API] No valid API key found matching: {token[:20]}...') + + except Exception as e: + logging.writeToFile(f'[API] OPTION 3 validation error: {str(e)}') + pass # Fall through to final error except Exception as e: logging.writeToFile(f'[API] Token validation error: {str(e)}') diff --git a/aiScanner/test_api_endpoint.py b/aiScanner/test_api_endpoint.py new file mode 100644 index 000000000..c65fccf55 --- /dev/null +++ b/aiScanner/test_api_endpoint.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Test endpoint to debug API key validation for AI Scanner +Add this to your aiScanner/urls.py: + path('api/test-auth/', test_api_endpoint.test_auth, name='test_auth'), +""" + +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods +import json +from .api import validate_access_token, extract_auth_token +from .models import AIScannerSettings, ScanHistory +from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + + +@csrf_exempt +@require_http_methods(['POST']) +def test_auth(request): + """ + Test endpoint to validate API authentication + + Usage: + curl -X POST http://localhost:8001/api/ai-scanner/test-auth/ \ + -H "X-API-Key: cp_your_api_key_here" \ + -H "X-Scan-ID: your-scan-id" \ + -H "Content-Type: application/json" \ + -d '{"scan_id": "your-scan-id"}' + """ + try: + # Parse request + data = json.loads(request.body) if request.body else {} + scan_id = data.get('scan_id', '') or request.META.get('HTTP_X_SCAN_ID', '') + + # Extract authentication token + access_token, auth_type = extract_auth_token(request) + + response = { + 'auth_type_detected': auth_type, + 'token_prefix': access_token[:20] + '...' if access_token else None, + 'scan_id': scan_id, + 'validation_steps': [] + } + + if not access_token: + response['error'] = 'No authentication token found' + response['validation_steps'].append('FAILED: No Bearer token or X-API-Key header found') + return JsonResponse(response, status=401) + + if not scan_id: + response['error'] = 'No scan_id provided' + response['validation_steps'].append('FAILED: No scan_id in body or X-Scan-ID header') + return JsonResponse(response, status=400) + + # Check if API key exists in database + response['validation_steps'].append(f'Checking if token {access_token[:20]}... exists in database') + + api_key_exists = AIScannerSettings.objects.filter(api_key=access_token).exists() + response['api_key_exists'] = api_key_exists + + if api_key_exists: + response['validation_steps'].append('SUCCESS: API key found in AIScannerSettings') + + # Get the admin who owns this API key + settings = AIScannerSettings.objects.get(api_key=access_token) + response['api_key_owner'] = settings.admin.userName + response['validation_steps'].append(f'API key belongs to admin: {settings.admin.userName}') + else: + response['validation_steps'].append('WARNING: API key not found in AIScannerSettings') + + # Check if scan exists + response['validation_steps'].append(f'Checking if scan {scan_id} exists') + + try: + scan = ScanHistory.objects.get(scan_id=scan_id) + response['scan_exists'] = True + response['scan_domain'] = scan.domain + response['scan_admin'] = scan.admin.userName + response['scan_status'] = scan.status + response['validation_steps'].append(f'SUCCESS: Scan found for domain {scan.domain}, admin {scan.admin.userName}') + except ScanHistory.DoesNotExist: + response['scan_exists'] = False + response['validation_steps'].append('WARNING: Scan not found in database') + + # Now validate using the actual validation function + response['validation_steps'].append('Running validate_access_token() function...') + + auth_wrapper, error = validate_access_token(access_token, scan_id) + + if error: + response['validation_error'] = error + response['validation_success'] = False + response['validation_steps'].append(f'FAILED: {error}') + return JsonResponse(response, status=401) + else: + response['validation_success'] = True + response['auth_wrapper'] = { + 'domain': auth_wrapper.domain, + 'wp_path': auth_wrapper.wp_path, + 'auth_type': auth_wrapper.auth_type, + 'external_app': auth_wrapper.external_app + } + response['validation_steps'].append(f'SUCCESS: Token validated as {auth_wrapper.auth_type}') + return JsonResponse(response) + + except Exception as e: + logging.writeToFile(f'[API TEST] Error: {str(e)}') + return JsonResponse({ + 'error': str(e), + 'validation_steps': ['EXCEPTION: ' + str(e)] + }, status=500) + + +@csrf_exempt +@require_http_methods(['GET']) +def list_api_keys(request): + """ + Debug endpoint to list all API keys in the system + + Usage: + curl http://localhost:8001/api/ai-scanner/list-api-keys/ + """ + try: + api_keys = [] + for settings in AIScannerSettings.objects.all(): + api_keys.append({ + 'admin': settings.admin.userName, + 'api_key_prefix': settings.api_key[:20] + '...' if settings.api_key else 'None', + 'balance': float(settings.balance), + 'is_payment_configured': settings.is_payment_configured + }) + + recent_scans = [] + for scan in ScanHistory.objects.all()[:5]: + recent_scans.append({ + 'scan_id': scan.scan_id, + 'domain': scan.domain, + 'admin': scan.admin.userName, + 'status': scan.status, + 'started_at': scan.started_at.isoformat() if scan.started_at else None + }) + + return JsonResponse({ + 'api_keys': api_keys, + 'recent_scans': recent_scans + }) + except Exception as e: + return JsonResponse({'error': str(e)}, status=500) \ No newline at end of file diff --git a/api/urls.py b/api/urls.py index 49ebad135..29ecbc041 100644 --- a/api/urls.py +++ b/api/urls.py @@ -47,4 +47,8 @@ urlpatterns = [ re_path(r'^scanner/replace-file$', views.scannerReplaceFile, name='scannerReplaceFileAPI'), re_path(r'^scanner/rename-file$', views.scannerRenameFile, name='scannerRenameFileAPI'), re_path(r'^scanner/delete-file$', views.scannerDeleteFile, name='scannerDeleteFileAPI'), + + # Debug endpoints for testing API authentication (remove in production) + re_path(r'^ai-scanner/test-auth$', views.testAuthDebug, name='testAuthDebugAPI'), + re_path(r'^ai-scanner/list-api-keys$', views.listApiKeysDebug, name='listApiKeysDebugAPI'), ] diff --git a/api/views.py b/api/views.py index f938e1746..ea3733e62 100644 --- a/api/views.py +++ b/api/views.py @@ -976,3 +976,24 @@ def scannerDeleteFile(request): logging.writeToFile(f'[API] Scanner delete file error: {str(e)}') data_ret = {'error': 'Delete file service unavailable'} return HttpResponse(json.dumps(data_ret), status=500) + + +# Debug endpoints for testing API authentication (remove in production) +def testAuthDebug(request): + """Test endpoint to debug API authentication""" + try: + from aiScanner.test_api_endpoint import test_auth + return test_auth(request) + except Exception as e: + logging.writeToFile(f'[API] Test auth debug error: {str(e)}') + return HttpResponse(json.dumps({'error': str(e)}), status=500) + + +def listApiKeysDebug(request): + """Debug endpoint to list API keys in system""" + try: + from aiScanner.test_api_endpoint import list_api_keys + return list_api_keys(request) + except Exception as e: + logging.writeToFile(f'[API] List API keys debug error: {str(e)}') + return HttpResponse(json.dumps({'error': str(e)}), status=500) diff --git a/test_api_key_fix.sh b/test_api_key_fix.sh new file mode 100755 index 000000000..d291fb2c9 --- /dev/null +++ b/test_api_key_fix.sh @@ -0,0 +1,68 @@ +#!/bin/bash +# Test script to verify API key validation fix + +# Configuration - adjust these values +SERVER="http://localhost:8001" +API_KEY="cp_GrHf3ysP0SKhrEiazmqt3kRJA5KwOFQW8VJKcDQ8B5Bg" # Your actual API key +SCAN_ID="550e8400-e29b-41d4-a716-446655440000" # A valid scan ID from your system + +echo "==========================================" +echo "Testing CyberPanel API Key Validation Fix" +echo "==========================================" +echo "" + +# Test 1: List API keys in the system +echo "1. Listing API keys in system..." +echo "---------------------------------" +curl -s "$SERVER/api/ai-scanner/list-api-keys/" | python3 -m json.tool +echo "" + +# Test 2: Test authentication with X-API-Key header +echo "2. Testing X-API-Key authentication..." +echo "---------------------------------------" +curl -s -X POST "$SERVER/api/ai-scanner/test-auth/" \ + -H "X-API-Key: $API_KEY" \ + -H "X-Scan-ID: $SCAN_ID" \ + -H "Content-Type: application/json" \ + -d "{\"scan_id\": \"$SCAN_ID\"}" | python3 -m json.tool +echo "" + +# Test 3: Test actual file operation with X-API-Key +echo "3. Testing file operation with X-API-Key..." +echo "--------------------------------------------" +RESPONSE=$(curl -s -w "\n%{http_code}" "$SERVER/api/scanner/get-file?file_path=wp-content/test.php" \ + -H "X-API-Key: $API_KEY" \ + -H "X-Scan-ID: $SCAN_ID") + +HTTP_CODE=$(echo "$RESPONSE" | tail -n1) +BODY=$(echo "$RESPONSE" | head -n-1) + +echo "HTTP Status: $HTTP_CODE" +echo "Response body:" +echo "$BODY" | python3 -m json.tool 2>/dev/null || echo "$BODY" +echo "" + +# Test 4: Test with Bearer token (backward compatibility) +echo "4. Testing Bearer token (backward compatibility)..." +echo "----------------------------------------------------" +RESPONSE=$(curl -s -w "\n%{http_code}" "$SERVER/api/scanner/get-file?file_path=wp-content/test.php" \ + -H "Authorization: Bearer $API_KEY" \ + -H "X-Scan-ID: $SCAN_ID") + +HTTP_CODE=$(echo "$RESPONSE" | tail -n1) +BODY=$(echo "$RESPONSE" | head -n-1) + +echo "HTTP Status: $HTTP_CODE" +echo "Response body:" +echo "$BODY" | python3 -m json.tool 2>/dev/null || echo "$BODY" +echo "" + +echo "==========================================" +echo "Test complete!" +echo "" +echo "Expected results:" +echo "- Test 1: Should show API keys in system" +echo "- Test 2: Should show validation success with detailed steps" +echo "- Test 3: Should return 200 or 404 (not 401)" +echo "- Test 4: Should also work with Bearer token" +echo "==========================================" \ No newline at end of file