From 685c08db669728f9eacb51633be03aa1d3a05031 Mon Sep 17 00:00:00 2001 From: usmannasir Date: Sun, 27 Apr 2025 16:06:40 +0500 Subject: [PATCH] bug fix: file name --- plogical/customAcme.py | 934 ----------------------------------------- 1 file changed, 934 deletions(-) delete mode 100644 plogical/customAcme.py diff --git a/plogical/customAcme.py b/plogical/customAcme.py deleted file mode 100644 index e769e8c7d..000000000 --- a/plogical/customAcme.py +++ /dev/null @@ -1,934 +0,0 @@ -import json -import os -import time -import requests -import base64 -import hashlib -import logging -from cryptography import x509 -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import padding -import OpenSSL -from plogical import CyberCPLogFileWriter as logging -from plogical.processUtilities import ProcessUtilities -import socket - -class CustomACME: - def __init__(self, domain, admin_email, staging=False, provider='letsencrypt'): - """Initialize CustomACME""" - logging.CyberCPLogFileWriter.writeToFile(f'Initializing CustomACME for domain: {domain}, email: {admin_email}, staging: {staging}, provider: {provider}') - self.domain = domain - self.admin_email = admin_email - self.staging = staging - self.provider = provider - - # Set the ACME directory URL based on provider and staging flag - if provider == 'zerossl': - if staging: - self.acme_directory = "https://acme-staging.zerossl.com/v2/DV90" - logging.CyberCPLogFileWriter.writeToFile('Using ZeroSSL staging ACME directory') - else: - self.acme_directory = "https://acme.zerossl.com/v2/DV90" - logging.CyberCPLogFileWriter.writeToFile('Using ZeroSSL production ACME directory') - else: # letsencrypt - if staging: - self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory" - logging.CyberCPLogFileWriter.writeToFile('Using Let\'s Encrypt staging ACME directory') - else: - self.acme_directory = "https://acme-v02.api.letsencrypt.org/directory" - logging.CyberCPLogFileWriter.writeToFile('Using Let\'s Encrypt production ACME directory') - - self.account_key = None - self.account_url = None - self.directory = None - self.nonce = None - self.order_url = None - self.authorizations = [] - self.finalize_url = None - self.certificate_url = None - - # Initialize paths - self.cert_path = f'/etc/letsencrypt/live/{domain}' - self.challenge_path = '/usr/local/lsws/Example/html/.well-known/acme-challenge' - self.account_key_path = f'/etc/letsencrypt/accounts/{domain}.key' - logging.CyberCPLogFileWriter.writeToFile(f'Certificate path: {self.cert_path}, Challenge path: {self.challenge_path}') - - # Create accounts directory if it doesn't exist - os.makedirs('/etc/letsencrypt/accounts', exist_ok=True) - - def _generate_account_key(self): - """Generate RSA account key""" - try: - logging.CyberCPLogFileWriter.writeToFile('Generating RSA account key...') - key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) - self.account_key = key - logging.CyberCPLogFileWriter.writeToFile('Successfully generated RSA account key') - return True - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error generating account key: {str(e)}') - return False - - def _get_directory(self): - """Get ACME directory""" - try: - logging.CyberCPLogFileWriter.writeToFile(f'Fetching ACME directory from {self.acme_directory}') - response = requests.get(self.acme_directory) - self.directory = response.json() - logging.CyberCPLogFileWriter.writeToFile(f'Successfully fetched ACME directory: {json.dumps(self.directory)}') - return True - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error getting directory: {str(e)}') - return False - - def _get_nonce(self): - """Get new nonce from ACME server""" - try: - logging.CyberCPLogFileWriter.writeToFile('Getting new nonce...') - response = requests.head(self.directory['newNonce']) - self.nonce = response.headers['Replay-Nonce'] - logging.CyberCPLogFileWriter.writeToFile(f'Successfully got nonce: {self.nonce}') - return True - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error getting nonce: {str(e)}') - return False - - def _create_jws(self, payload, url): - """Create JWS (JSON Web Signature)""" - try: - logging.CyberCPLogFileWriter.writeToFile(f'Creating JWS for URL: {url}') - if payload is not None: - logging.CyberCPLogFileWriter.writeToFile(f'Payload: {json.dumps(payload)}') - - # Get a fresh nonce for this request - if not self._get_nonce(): - logging.CyberCPLogFileWriter.writeToFile('Failed to get fresh nonce') - return None - - # Get the private key numbers - logging.CyberCPLogFileWriter.writeToFile('Getting private key numbers...') - private_numbers = self.account_key.private_numbers() - public_numbers = private_numbers.public_numbers - - # Convert numbers to bytes - logging.CyberCPLogFileWriter.writeToFile('Converting RSA numbers to bytes...') - n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big') - e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big') - - # Create JWK - logging.CyberCPLogFileWriter.writeToFile('Creating JWK...') - jwk_key = { - "kty": "RSA", - "n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='), - "e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('='), - "alg": "RS256" - } - logging.CyberCPLogFileWriter.writeToFile(f'Created JWK: {json.dumps(jwk_key)}') - - # Create protected header - protected = { - "alg": "RS256", - "url": url, - "nonce": self.nonce - } - - # Add either JWK or Key ID based on whether we have an account URL - if self.account_url and url != self.directory['newAccount']: - protected["kid"] = self.account_url - logging.CyberCPLogFileWriter.writeToFile(f'Using Key ID: {self.account_url}') - else: - protected["jwk"] = jwk_key - logging.CyberCPLogFileWriter.writeToFile('Using JWK for new account') - - # Encode protected header - logging.CyberCPLogFileWriter.writeToFile('Encoding protected header...') - protected_b64 = base64.urlsafe_b64encode( - json.dumps(protected).encode('utf-8') - ).decode('utf-8').rstrip('=') - - # For POST-as-GET requests, payload_b64 should be empty string - if payload is None: - payload_b64 = "" - logging.CyberCPLogFileWriter.writeToFile('Using empty payload for POST-as-GET request') - else: - # Encode payload - logging.CyberCPLogFileWriter.writeToFile('Encoding payload...') - payload_b64 = base64.urlsafe_b64encode( - json.dumps(payload).encode('utf-8') - ).decode('utf-8').rstrip('=') - - # Create signature input - logging.CyberCPLogFileWriter.writeToFile('Creating signature input...') - signature_input = f"{protected_b64}.{payload_b64}".encode('utf-8') - - # Sign the input - logging.CyberCPLogFileWriter.writeToFile('Signing input...') - signature = self.account_key.sign( - signature_input, - padding.PKCS1v15(), - hashes.SHA256() - ) - - # Encode signature - logging.CyberCPLogFileWriter.writeToFile('Encoding signature...') - signature_b64 = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip('=') - - # Create final JWS - logging.CyberCPLogFileWriter.writeToFile('Creating final JWS...') - jws = { - "protected": protected_b64, - "signature": signature_b64 - } - - # Only add payload if it exists - if payload is not None: - jws["payload"] = payload_b64 - - # Ensure the JWS is properly formatted - jws_str = json.dumps(jws, separators=(',', ':')) - logging.CyberCPLogFileWriter.writeToFile(f'Final JWS: {jws_str}') - - return jws_str - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error creating JWS: {str(e)}') - return None - - def _load_account_key(self): - """Load existing account key if available""" - try: - if os.path.exists(self.account_key_path): - logging.CyberCPLogFileWriter.writeToFile('Loading existing account key...') - with open(self.account_key_path, 'rb') as f: - key_data = f.read() - self.account_key = serialization.load_pem_private_key( - key_data, - password=None, - backend=default_backend() - ) - logging.CyberCPLogFileWriter.writeToFile('Successfully loaded existing account key') - return True - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error loading account key: {str(e)}') - return False - - def _save_account_key(self): - """Save account key for future use""" - try: - logging.CyberCPLogFileWriter.writeToFile('Saving account key...') - key_data = self.account_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() - ) - with open(self.account_key_path, 'wb') as f: - f.write(key_data) - logging.CyberCPLogFileWriter.writeToFile('Successfully saved account key') - return True - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error saving account key: {str(e)}') - return False - - def _create_account(self): - """Create new ACME account""" - try: - logging.CyberCPLogFileWriter.writeToFile('Creating new ACME account...') - payload = { - "termsOfServiceAgreed": True, - "contact": [f"mailto:{self.admin_email}"] - } - - jws = self._create_jws(payload, self.directory['newAccount']) - if not jws: - logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for account creation') - return False - - logging.CyberCPLogFileWriter.writeToFile('Sending account creation request...') - headers = { - 'Content-Type': 'application/jose+json' - } - response = requests.post(self.directory['newAccount'], data=jws, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Account creation response status: {response.status_code}') - logging.CyberCPLogFileWriter.writeToFile(f'Account creation response: {response.text}') - - if response.status_code == 201: - self.account_url = response.headers['Location'] - logging.CyberCPLogFileWriter.writeToFile(f'Successfully created account. Account URL: {self.account_url}') - # Save the account key for future use - self._save_account_key() - return True - elif response.status_code == 429: - logging.CyberCPLogFileWriter.writeToFile('Rate limit hit for account creation. Using staging environment...') - self.staging = True - self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory" - # Get new directory and nonce for staging - if not self._get_directory(): - return False - if not self._get_nonce(): - return False - # Try one more time with staging - return self._create_account() - elif response.status_code == 400 and "badNonce" in response.text: - logging.CyberCPLogFileWriter.writeToFile('Bad nonce, getting new nonce and retrying...') - if not self._get_nonce(): - return False - return self._create_account() - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error creating account: {str(e)}') - return False - - def _create_order(self, domains): - """Create new order for domains""" - try: - logging.CyberCPLogFileWriter.writeToFile(f'Creating new order for domains: {domains}') - identifiers = [{"type": "dns", "value": domain} for domain in domains] - payload = { - "identifiers": identifiers - } - - jws = self._create_jws(payload, self.directory['newOrder']) - if not jws: - logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order creation') - return False - - logging.CyberCPLogFileWriter.writeToFile('Sending order creation request...') - headers = { - 'Content-Type': 'application/jose+json' - } - response = requests.post(self.directory['newOrder'], data=jws, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Order creation response status: {response.status_code}') - logging.CyberCPLogFileWriter.writeToFile(f'Order creation response: {response.text}') - - if response.status_code == 201: - self.order_url = response.headers['Location'] - self.authorizations = response.json()['authorizations'] - self.finalize_url = response.json()['finalize'] - logging.CyberCPLogFileWriter.writeToFile(f'Successfully created order. Order URL: {self.order_url}') - logging.CyberCPLogFileWriter.writeToFile(f'Authorizations: {self.authorizations}') - logging.CyberCPLogFileWriter.writeToFile(f'Finalize URL: {self.finalize_url}') - return True - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error creating order: {str(e)}') - return False - - def _handle_http_challenge(self, challenge): - """Handle HTTP-01 challenge""" - try: - logging.CyberCPLogFileWriter.writeToFile(f'Handling HTTP challenge: {json.dumps(challenge)}') - - # Get key authorization - key_auth = self._get_key_authorization(challenge) - if not key_auth: - logging.CyberCPLogFileWriter.writeToFile('Failed to get key authorization') - return False - - # Create challenge directory if it doesn't exist - if not os.path.exists(self.challenge_path): - logging.CyberCPLogFileWriter.writeToFile(f'Creating challenge directory: {self.challenge_path}') - os.makedirs(self.challenge_path) - - # Write challenge file - challenge_file = os.path.join(self.challenge_path, challenge['token']) - logging.CyberCPLogFileWriter.writeToFile(f'Writing challenge file: {challenge_file}') - - # Write only the key authorization to the file - with open(challenge_file, 'w') as f: - f.write(key_auth) - - logging.CyberCPLogFileWriter.writeToFile('Successfully handled HTTP challenge') - return True - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error handling HTTP challenge: {str(e)}') - return False - - def _handle_dns_challenge(self, challenge): - """Handle DNS-01 challenge (Cloudflare)""" - try: - logging.CyberCPLogFileWriter.writeToFile(f'Handling DNS challenge: {json.dumps(challenge)}') - # This is a placeholder - implement Cloudflare API integration - # You'll need to add your Cloudflare API credentials and implementation - pass - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error handling DNS challenge: {str(e)}') - return False - - def _get_key_authorization(self, challenge): - """Get key authorization for challenge""" - try: - logging.CyberCPLogFileWriter.writeToFile('Getting key authorization...') - - # Get the private key numbers - private_numbers = self.account_key.private_numbers() - public_numbers = private_numbers.public_numbers - - # Convert numbers to bytes - n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big') - e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big') - - # Create JWK without alg field - jwk_key = { - "kty": "RSA", - "n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='), - "e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('=') - } - - # Calculate the JWK thumbprint according to RFC 7638 - # The thumbprint is a hash of the JWK (JSON Web Key) in a specific format - # First, we create a dictionary with the required JWK parameters - jwk = { - "e": base64.urlsafe_b64encode(public_numbers.e.to_bytes(3, 'big')).decode('utf-8').rstrip('='), - "kty": "RSA", # Key type - "n": base64.urlsafe_b64encode(public_numbers.n.to_bytes(256, 'big')).decode('utf-8').rstrip('=') - } - - # Sort the JWK parameters alphabetically by key name - # This ensures consistent thumbprint calculation regardless of parameter order - sorted_jwk = json.dumps(jwk, sort_keys=True, separators=(',', ':')) - - # Calculate the SHA-256 hash of the sorted JWK - # Example of what sorted_jwk might look like: - # {"e":"AQAB","kty":"RSA","n":"tVKUtcx_n9rt5afY_2WFNVAu9fjD4xqX4Xm3dJz3XYb"} - # The thumbprint will be a 32-byte SHA-256 hash of this string - # For example, it might look like: b'x\x9c\x1d\x8f\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e' - thumbprint = hashlib.sha256(sorted_jwk.encode('utf-8')).digest() - - # Encode the thumbprint in base64url format (RFC 4648) - # This removes padding characters (=) and replaces + and / with - and _ - # Example final thumbprint: "xJ0dj8sbHosbHosbHosbHos" - thumbprint = base64.urlsafe_b64encode(thumbprint).decode('utf-8').rstrip('=') - - # Combine token and key authorization - key_auth = f"{challenge['token']}.{thumbprint}" - logging.CyberCPLogFileWriter.writeToFile(f'Key authorization: {key_auth}') - return key_auth - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error getting key authorization: {str(e)}') - return None - - def _verify_challenge(self, challenge_url): - """Verify challenge completion with the ACME server - - This function sends a POST request to the ACME server to verify that the challenge - has been completed successfully. The challenge URL is provided by the ACME server - when the challenge is created. - - Example challenge_url: - "https://acme-v02.api.letsencrypt.org/acme/challenge/example.com/123456" - - The verification process: - 1. Creates an empty payload (POST-as-GET request) - 2. Creates a JWS (JSON Web Signature) with the payload - 3. Sends the request to the ACME server - 4. Checks the response status - - Returns: - bool: True if challenge is verified successfully, False otherwise - """ - try: - logging.CyberCPLogFileWriter.writeToFile(f'Verifying challenge at URL: {challenge_url}') - - # Create empty payload for POST-as-GET request - # This is a special type of request where we want to GET a resource - # but need to include a signature, so we use POST with an empty payload - payload = {} - - # Create JWS (JSON Web Signature) for the request - # Example JWS might look like: - # { - # "protected": "eyJhbGciOiJSUzI1NiIsIm5vbmNlIjoiMTIzNDU2Nzg5MCIsInVybCI6Imh0dHBzOi8vYWNtZS12MDIuYXBpLmxldHNlbmNyeXB0Lm9yZy9hY21lL2NoYWxsZW5nZS9leGFtcGxlLmNvbS8xMjM0NTYifQ", - # "signature": "c2lnbmF0dXJlX2hlcmU", - # "payload": "" - # } - jws = self._create_jws(payload, challenge_url) - if not jws: - logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for challenge verification') - return False - - logging.CyberCPLogFileWriter.writeToFile('Sending challenge verification request...') - - # Set headers for the request - # Content-Type: application/jose+json indicates we're sending a JWS - headers = { - 'Content-Type': 'application/jose+json' - } - - # Send the verification request to the ACME server - # Example response might look like: - # { - # "type": "http-01", - # "status": "valid", - # "validated": "2024-03-20T12:00:00Z", - # "url": "https://acme-v02.api.letsencrypt.org/acme/challenge/example.com/123456" - # } - response = requests.post(challenge_url, data=jws, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response status: {response.status_code}') - logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response: {response.text}') - - # Check if the challenge was verified successfully - # Status code 200 indicates success - # The response will contain the challenge status and validation time - if response.status_code == 200: - logging.CyberCPLogFileWriter.writeToFile('Successfully verified challenge') - return True - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error verifying challenge: {str(e)}') - return False - - def _finalize_order(self, csr): - """Finalize order and get certificate""" - try: - logging.CyberCPLogFileWriter.writeToFile('Finalizing order...') - payload = { - "csr": base64.urlsafe_b64encode(csr).decode('utf-8').rstrip('=') - } - - jws = self._create_jws(payload, self.finalize_url) - if not jws: - logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order finalization') - return False - - logging.CyberCPLogFileWriter.writeToFile('Sending order finalization request...') - headers = { - 'Content-Type': 'application/jose+json' - } - response = requests.post(self.finalize_url, data=jws, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response status: {response.status_code}') - logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response: {response.text}') - - if response.status_code == 200: - # Wait for order to be processed - max_attempts = 30 - delay = 2 - for attempt in range(max_attempts): - if not self._get_nonce(): - logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check') - return False - - response = requests.get(self.order_url, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}') - - if response.status_code == 200: - order_status = response.json().get('status') - if order_status == 'valid': - self.certificate_url = response.json().get('certificate') - logging.CyberCPLogFileWriter.writeToFile(f'Successfully finalized order. Certificate URL: {self.certificate_url}') - return True - elif order_status == 'invalid': - logging.CyberCPLogFileWriter.writeToFile('Order validation failed') - return False - elif order_status == 'processing': - logging.CyberCPLogFileWriter.writeToFile(f'Order still processing, attempt {attempt + 1}/{max_attempts}') - time.sleep(delay) - continue - - logging.CyberCPLogFileWriter.writeToFile(f'Order status check failed, attempt {attempt + 1}/{max_attempts}') - time.sleep(delay) - - logging.CyberCPLogFileWriter.writeToFile('Order processing timed out') - return False - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error finalizing order: {str(e)}') - return False - - def _download_certificate(self): - """Download certificate from ACME server""" - try: - logging.CyberCPLogFileWriter.writeToFile('Downloading certificate...') - logging.CyberCPLogFileWriter.writeToFile(f'Certificate URL: {self.certificate_url}') - - # For certificate downloads, we can use a simple GET request - response = requests.get(self.certificate_url) - logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response status: {response.status_code}') - logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response headers: {response.headers}') - logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response content: {response.text}') - - if response.status_code == 200: - logging.CyberCPLogFileWriter.writeToFile('Successfully downloaded certificate') - return response.content - return None - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error downloading certificate: {str(e)}') - return None - - def _wait_for_challenge_validation(self, challenge_url, max_attempts=30, delay=2): - """Wait for challenge to be validated by the ACME server""" - try: - logging.CyberCPLogFileWriter.writeToFile(f'Waiting for challenge validation at URL: {challenge_url}') - for attempt in range(max_attempts): - if not self._get_nonce(): - logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for challenge status check') - return False - - headers = { - 'Content-Type': 'application/jose+json' - } - response = requests.get(challenge_url, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Challenge status check response: {response.text}') - - if response.status_code == 200: - challenge_status = response.json().get('status') - if challenge_status == 'valid': - logging.CyberCPLogFileWriter.writeToFile('Challenge validated successfully') - return True - elif challenge_status == 'invalid': - logging.CyberCPLogFileWriter.writeToFile('Challenge validation failed') - return False - - logging.CyberCPLogFileWriter.writeToFile(f'Challenge still pending, attempt {attempt + 1}/{max_attempts}') - time.sleep(delay) - - logging.CyberCPLogFileWriter.writeToFile('Challenge validation timed out') - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error waiting for challenge validation: {str(e)}') - return False - - def _check_dns_record(self, domain): - """Check if a domain has valid DNS records - - This function performs multiple DNS checks to ensure the domain has valid DNS records. - It includes: - 1. A record (IPv4) check - 2. AAAA record (IPv6) check - 3. DNS caching prevention - 4. Multiple DNS server checks - - Args: - domain (str): The domain to check - - Returns: - bool: True if valid DNS records are found, False otherwise - """ - try: - logging.CyberCPLogFileWriter.writeToFile(f'Checking DNS records for domain: {domain}') - - # List of public DNS servers to check against - dns_servers = [ - '8.8.8.8', # Google DNS - '1.1.1.1', # Cloudflare DNS - '208.67.222.222' # OpenDNS - ] - - # Function to check DNS record with specific DNS server - def check_with_dns_server(server, record_type='A'): - try: - # Create a new socket for each check - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.settimeout(5) # 5 second timeout - - # Set the DNS server - sock.connect((server, 53)) - - # Create DNS query - query = bytearray() - # DNS header - query += b'\x00\x01' # Transaction ID - query += b'\x01\x00' # Flags: Standard query - query += b'\x00\x01' # Questions: 1 - query += b'\x00\x00' # Answer RRs: 0 - query += b'\x00\x00' # Authority RRs: 0 - query += b'\x00\x00' # Additional RRs: 0 - - # Domain name - for part in domain.split('.'): - query.append(len(part)) - query.extend(part.encode()) - query += b'\x00' # End of domain name - - # Query type and class - if record_type == 'A': - query += b'\x00\x01' # Type: A - else: # AAAA - query += b'\x00\x1c' # Type: AAAA - query += b'\x00\x01' # Class: IN - - # Send query - sock.send(query) - - # Receive response - response = sock.recv(1024) - - # Check if we got a valid response - if len(response) > 12: # Minimum DNS response size - # Check if there are answers in the response - answer_count = int.from_bytes(response[6:8], 'big') - if answer_count > 0: - return True - - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS with server {server}: {str(e)}') - return False - finally: - sock.close() - - # Check A records (IPv4) with multiple DNS servers - a_record_found = False - for server in dns_servers: - if check_with_dns_server(server, 'A'): - a_record_found = True - break - - # Check AAAA records (IPv6) with multiple DNS servers - aaaa_record_found = False - for server in dns_servers: - if check_with_dns_server(server, 'AAAA'): - aaaa_record_found = True - break - - # Also check with system's DNS resolver as a fallback - try: - # Try to resolve A record (IPv4) - socket.gethostbyname(domain) - a_record_found = True - except socket.gaierror: - pass - - try: - # Try to resolve AAAA record (IPv6) - socket.getaddrinfo(domain, None, socket.AF_INET6) - aaaa_record_found = True - except socket.gaierror: - pass - - # Log the results - if a_record_found: - logging.CyberCPLogFileWriter.writeToFile(f'IPv4 DNS record found for domain: {domain}') - if aaaa_record_found: - logging.CyberCPLogFileWriter.writeToFile(f'IPv6 DNS record found for domain: {domain}') - - # Return True if either A or AAAA record is found - return a_record_found or aaaa_record_found - - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS records: {str(e)}') - return False - - def _wait_for_order_processing(self, max_attempts=30, delay=2): - """Wait for order to be processed""" - try: - logging.CyberCPLogFileWriter.writeToFile('Waiting for order processing...') - for attempt in range(max_attempts): - if not self._get_nonce(): - logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check') - return False - - headers = { - 'Content-Type': 'application/jose+json' - } - response = requests.get(self.order_url, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}') - - if response.status_code == 200: - order_status = response.json().get('status') - if order_status == 'valid': - self.certificate_url = response.json().get('certificate') - logging.CyberCPLogFileWriter.writeToFile('Order validated successfully') - return True - elif order_status == 'invalid': - logging.CyberCPLogFileWriter.writeToFile('Order validation failed') - return False - elif order_status == 'processing': - logging.CyberCPLogFileWriter.writeToFile(f'Order still processing, attempt {attempt + 1}/{max_attempts}') - time.sleep(delay) - continue - - logging.CyberCPLogFileWriter.writeToFile(f'Order status check failed, attempt {attempt + 1}/{max_attempts}') - time.sleep(delay) - - logging.CyberCPLogFileWriter.writeToFile('Order processing timed out') - return False - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error waiting for order processing: {str(e)}') - return False - - def issue_certificate(self, domains, use_dns=False): - """Main method to issue certificate""" - try: - logging.CyberCPLogFileWriter.writeToFile(f'Starting certificate issuance for domains: {domains}, use_dns: {use_dns}') - - # Try to load existing account key first - if self._load_account_key(): - logging.CyberCPLogFileWriter.writeToFile('Using existing account key') - else: - logging.CyberCPLogFileWriter.writeToFile('No existing account key found, will create new one') - - # Filter domains to only include those with valid DNS records - valid_domains = [] - for domain in domains: - if self._check_dns_record(domain): - valid_domains.append(domain) - else: - logging.CyberCPLogFileWriter.writeToFile(f'Skipping domain {domain} due to missing DNS records') - - if not valid_domains: - logging.CyberCPLogFileWriter.writeToFile('No valid domains found with DNS records') - return False - - # Initialize ACME - logging.CyberCPLogFileWriter.writeToFile('Step 1: Generating account key') - if not self._generate_account_key(): - logging.CyberCPLogFileWriter.writeToFile('Failed to generate account key') - return False - - logging.CyberCPLogFileWriter.writeToFile('Step 2: Getting ACME directory') - if not self._get_directory(): - logging.CyberCPLogFileWriter.writeToFile('Failed to get ACME directory') - return False - - logging.CyberCPLogFileWriter.writeToFile('Step 3: Getting nonce') - if not self._get_nonce(): - logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce') - return False - - logging.CyberCPLogFileWriter.writeToFile('Step 4: Creating account') - if not self._create_account(): - logging.CyberCPLogFileWriter.writeToFile('Failed to create account') - # If we failed to create account and we're not in staging, try staging - if not self.staging: - logging.CyberCPLogFileWriter.writeToFile('Switching to staging environment...') - self.staging = True - self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory" - if not self._get_directory(): - return False - if not self._get_nonce(): - return False - if not self._create_account(): - return False - else: - return False - - # Create order with only valid domains - logging.CyberCPLogFileWriter.writeToFile('Step 5: Creating order') - if not self._create_order(valid_domains): - logging.CyberCPLogFileWriter.writeToFile('Failed to create order') - return False - - # Handle challenges - logging.CyberCPLogFileWriter.writeToFile('Step 6: Handling challenges') - for auth_url in self.authorizations: - logging.CyberCPLogFileWriter.writeToFile(f'Processing authorization URL: {auth_url}') - if not self._get_nonce(): - logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for authorization') - return False - - # Get authorization details with GET request - headers = { - 'Content-Type': 'application/jose+json' - } - response = requests.get(auth_url, headers=headers) - logging.CyberCPLogFileWriter.writeToFile(f'Authorization response status: {response.status_code}') - logging.CyberCPLogFileWriter.writeToFile(f'Authorization response: {response.text}') - - if response.status_code != 200: - logging.CyberCPLogFileWriter.writeToFile('Failed to get authorization') - return False - - challenges = response.json()['challenges'] - for challenge in challenges: - logging.CyberCPLogFileWriter.writeToFile(f'Processing challenge: {json.dumps(challenge)}') - - # Only handle the challenge type we're using - if use_dns and challenge['type'] == 'dns-01': - if not self._handle_dns_challenge(challenge): - logging.CyberCPLogFileWriter.writeToFile('Failed to handle DNS challenge') - return False - if not self._verify_challenge(challenge['url']): - logging.CyberCPLogFileWriter.writeToFile('Failed to verify DNS challenge') - return False - if not self._wait_for_challenge_validation(challenge['url']): - logging.CyberCPLogFileWriter.writeToFile('DNS challenge validation failed') - return False - elif not use_dns and challenge['type'] == 'http-01': - if not self._handle_http_challenge(challenge): - logging.CyberCPLogFileWriter.writeToFile('Failed to handle HTTP challenge') - return False - if not self._verify_challenge(challenge['url']): - logging.CyberCPLogFileWriter.writeToFile('Failed to verify HTTP challenge') - return False - if not self._wait_for_challenge_validation(challenge['url']): - logging.CyberCPLogFileWriter.writeToFile('HTTP challenge validation failed') - return False - else: - logging.CyberCPLogFileWriter.writeToFile(f'Skipping {challenge["type"]} challenge') - - # Generate CSR - logging.CyberCPLogFileWriter.writeToFile('Step 7: Generating CSR') - key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) - - # Get the domain from the order response - order_response = requests.get(self.order_url, headers=headers).json() - order_domains = [identifier['value'] for identifier in order_response['identifiers']] - logging.CyberCPLogFileWriter.writeToFile(f'Order domains: {order_domains}') - - # Create CSR with exactly the domains from the order - csr = x509.CertificateSigningRequestBuilder().subject_name( - x509.Name([ - x509.NameAttribute(x509.NameOID.COMMON_NAME, order_domains[0]) - ]) - ).add_extension( - x509.SubjectAlternativeName([ - x509.DNSName(domain) for domain in order_domains - ]), - critical=False - ).sign(key, hashes.SHA256(), default_backend()) - - # Finalize order - logging.CyberCPLogFileWriter.writeToFile('Step 8: Finalizing order') - if not self._finalize_order(csr.public_bytes(serialization.Encoding.DER)): - logging.CyberCPLogFileWriter.writeToFile('Failed to finalize order') - return False - - # Wait for order processing - logging.CyberCPLogFileWriter.writeToFile('Step 9: Waiting for order processing') - if not self._wait_for_order_processing(): - logging.CyberCPLogFileWriter.writeToFile('Failed to process order') - return False - - # Download certificate - logging.CyberCPLogFileWriter.writeToFile('Step 10: Downloading certificate') - certificate = self._download_certificate() - if not certificate: - logging.CyberCPLogFileWriter.writeToFile('Failed to download certificate') - return False - - # Save certificate - logging.CyberCPLogFileWriter.writeToFile('Step 11: Saving certificate') - if not os.path.exists(self.cert_path): - logging.CyberCPLogFileWriter.writeToFile(f'Creating certificate directory: {self.cert_path}') - os.makedirs(self.cert_path) - - cert_file = os.path.join(self.cert_path, 'fullchain.pem') - key_file = os.path.join(self.cert_path, 'privkey.pem') - - logging.CyberCPLogFileWriter.writeToFile(f'Saving certificate to: {cert_file}') - with open(cert_file, 'wb') as f: - f.write(certificate) - - logging.CyberCPLogFileWriter.writeToFile(f'Saving private key to: {key_file}') - with open(key_file, 'wb') as f: - f.write(key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() - )) - - logging.CyberCPLogFileWriter.writeToFile('Successfully completed certificate issuance') - return True - except Exception as e: - logging.CyberCPLogFileWriter.writeToFile(f'Error issuing certificate: {str(e)}') - return False \ No newline at end of file