diff --git a/plogical/customACME.py b/plogical/customACME.py index e769e8c7d..cf93c23d9 100644 --- a/plogical/customACME.py +++ b/plogical/customACME.py @@ -4,6 +4,7 @@ import time import requests import base64 import hashlib +import hmac import logging from cryptography import x509 from cryptography.hazmat.primitives import serialization @@ -92,7 +93,20 @@ class CustomACME: try: logging.CyberCPLogFileWriter.writeToFile('Getting new nonce...') response = requests.head(self.directory['newNonce']) - self.nonce = response.headers['Replay-Nonce'] + + # Check for nonce in headers (case-insensitive) + nonce_header = None + for header_name in ['Replay-Nonce', 'replay-nonce', 'REPLAY-NONCE']: + if header_name in response.headers: + nonce_header = header_name + break + + if not nonce_header: + # Log all available headers for debugging + logging.CyberCPLogFileWriter.writeToFile(f'Available headers: {list(response.headers.keys())}') + raise KeyError('Replay-Nonce header not found in response') + + self.nonce = response.headers[nonce_header] logging.CyberCPLogFileWriter.writeToFile(f'Successfully got nonce: {self.nonce}') return True except Exception as e: @@ -244,6 +258,27 @@ class CustomACME: "contact": [f"mailto:{self.admin_email}"] } + # Check if External Account Binding is required (for ZeroSSL) + if self.provider == 'zerossl' and 'meta' in self.directory and 'externalAccountRequired' in self.directory['meta']: + if self.directory['meta']['externalAccountRequired']: + logging.CyberCPLogFileWriter.writeToFile('ZeroSSL requires External Account Binding, getting EAB credentials...') + + # Get EAB credentials from ZeroSSL + eab_kid, eab_hmac_key = self._get_zerossl_eab_credentials() + if not eab_kid or not eab_hmac_key: + logging.CyberCPLogFileWriter.writeToFile('Failed to get ZeroSSL EAB credentials, falling back to Let\'s Encrypt') + # Fallback to Let's Encrypt + self.provider = 'letsencrypt' + self.acme_directory = "https://acme-v02.api.letsencrypt.org/directory" + if not self._get_directory(): + return False + if not self._get_nonce(): + return False + return self._create_account() + + # Add EAB to payload + payload['externalAccountBinding'] = self._create_eab(eab_kid, eab_hmac_key) + jws = self._create_jws(payload, self.directory['newAccount']) if not jws: logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for account creation') @@ -284,6 +319,90 @@ class CustomACME: logging.CyberCPLogFileWriter.writeToFile(f'Error creating account: {str(e)}') return False + def _get_zerossl_eab_credentials(self): + """Get External Account Binding credentials from ZeroSSL""" + try: + logging.CyberCPLogFileWriter.writeToFile('Getting ZeroSSL EAB credentials...') + + # Request EAB credentials from ZeroSSL API + eab_url = 'https://api.zerossl.com/acme/eab-credentials-email' + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + data = { + 'email': self.admin_email + } + + response = requests.post(eab_url, headers=headers, data=data) + logging.CyberCPLogFileWriter.writeToFile(f'ZeroSSL EAB response status: {response.status_code}') + logging.CyberCPLogFileWriter.writeToFile(f'ZeroSSL EAB response: {response.text}') + + if response.status_code == 200: + eab_data = response.json() + if 'eab_kid' in eab_data and 'eab_hmac_key' in eab_data: + return eab_data['eab_kid'], eab_data['eab_hmac_key'] + + return None, None + except Exception as e: + logging.CyberCPLogFileWriter.writeToFile(f'Error getting ZeroSSL EAB credentials: {str(e)}') + return None, None + + def _create_eab(self, eab_kid, eab_hmac_key): + """Create External Account Binding for ZeroSSL""" + try: + logging.CyberCPLogFileWriter.writeToFile('Creating External Account Binding...') + + # Get the private key numbers + private_numbers = self.account_key.private_numbers() + public_numbers = private_numbers.public_numbers + + # Convert numbers to bytes + n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big') + e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big') + + # Create JWK + jwk = { + "kty": "RSA", + "n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='), + "e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('=') + } + + # Create protected header for EAB + protected = { + "alg": "HS256", + "kid": eab_kid, + "url": self.directory['newAccount'] + } + + # Encode protected header + protected_b64 = base64.urlsafe_b64encode( + json.dumps(protected).encode('utf-8') + ).decode('utf-8').rstrip('=') + + # Encode JWK payload + payload_b64 = base64.urlsafe_b64encode( + json.dumps(jwk).encode('utf-8') + ).decode('utf-8').rstrip('=') + + # Create signature using HMAC-SHA256 + signature_input = f"{protected_b64}.{payload_b64}".encode('utf-8') + hmac_key = base64.urlsafe_b64decode(eab_hmac_key + '==') # Add padding if needed + signature = hmac.new(hmac_key, signature_input, hashlib.sha256).digest() + signature_b64 = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip('=') + + # Create EAB object + eab = { + "protected": protected_b64, + "payload": payload_b64, + "signature": signature_b64 + } + + logging.CyberCPLogFileWriter.writeToFile('Successfully created External Account Binding') + return eab + except Exception as e: + logging.CyberCPLogFileWriter.writeToFile(f'Error creating External Account Binding: {str(e)}') + return None + def _create_order(self, domains): """Create new order for domains""" try: