Fix: Add local DNS fallback to reverse_dns_lookup() - Resolves issue #1654

This commit is contained in:
master3395
2026-01-31 20:46:30 +01:00
parent 754e17d48b
commit 12bbdded87

View File

@@ -1637,6 +1637,7 @@ LogFile /var/log/clamav/clamav.log
def reverse_dns_lookup(ip_address):
"""
Perform reverse DNS lookup for the given IP address using external DNS servers.
Falls back to local DNS resolution if external APIs are unavailable.
Args:
ip_address: The IP address to perform reverse DNS lookup on
@@ -1652,38 +1653,35 @@ LogFile /var/log/clamav/clamav.log
try:
fetchURLs = requests.get('https://cyberpanel.net/dnsServers.txt', timeout=10)
except (ConnectionError, Timeout) as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to fetch DNS server list from cyberpanel.net: {str(e)}')
return []
logging.CyberCPLogFileWriter.writeToFile(f'Failed to fetch DNS server list from cyberpanel.net: {str(e)}. Falling back to local DNS lookup.')
fetchURLs = None
except RequestException as e:
logging.CyberCPLogFileWriter.writeToFile(f'Request error while fetching DNS server list: {str(e)}')
return []
logging.CyberCPLogFileWriter.writeToFile(f'Request error while fetching DNS server list: {str(e)}. Falling back to local DNS lookup.')
fetchURLs = None
if fetchURLs.status_code != 200:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to fetch DNS server list: HTTP {fetchURLs.status_code}')
return []
# Try external API if available
if fetchURLs and fetchURLs.status_code == 200:
try:
urls_data = fetchURLs.json()
if 'urls' not in urls_data:
logging.CyberCPLogFileWriter.writeToFile('DNS server list response missing "urls" key')
return []
urls = urls_data['urls']
except (ValueError, KeyError) as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to parse DNS server list JSON: {str(e)}')
return []
try:
urls_data = fetchURLs.json()
if 'urls' not in urls_data:
logging.CyberCPLogFileWriter.writeToFile('DNS server list response missing "urls" key. Falling back to local DNS lookup.')
fetchURLs = None
else:
urls = urls_data['urls']
if not isinstance(urls, list) or len(urls) == 0:
logging.CyberCPLogFileWriter.writeToFile('DNS server list is empty or invalid. Falling back to local DNS lookup.')
fetchURLs = None
else:
# External API is available, proceed with queries
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(f'DNS urls {urls}.')
if not isinstance(urls, list) or len(urls) == 0:
logging.CyberCPLogFileWriter.writeToFile('DNS server list is empty or invalid')
return []
results = []
successful_queries = 0
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(f'DNS urls {urls}.')
results = []
successful_queries = 0
# Query each DNS server
for url in urls:
# Query each DNS server
for url in urls:
try:
response = requests.get(f'{url}/index.php?ip={ip_address}', timeout=5)
@@ -1748,18 +1746,58 @@ LogFile /var/log/clamav/clamav.log
logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error while querying DNS server {url}: {str(e)}')
continue
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(f'rDNS result of {ip_address} is {str(results)} (successful queries: {successful_queries}/{len(urls)})')
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(f'rDNS result of {ip_address} is {str(results)} (successful queries: {successful_queries}/{len(urls)})')
# Return results (empty list if no successful queries)
return results
# If external API returned results, return them
if results:
return results
# Otherwise fall through to local DNS lookup
logging.CyberCPLogFileWriter.writeToFile(f'External DNS API queries returned no results for {ip_address}. Falling back to local DNS lookup.')
except (ValueError, KeyError) as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to parse DNS server list JSON: {str(e)}. Falling back to local DNS lookup.')
fetchURLs = None
else:
if fetchURLs:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to fetch DNS server list: HTTP {fetchURLs.status_code if fetchURLs else "N/A"}. Falling back to local DNS lookup.')
fetchURLs = None
# Fallback to local DNS lookup when external APIs fail or return no results
try:
import socket
rdns = socket.gethostbyaddr(ip_address)[0]
rdns_clean = rdns.rstrip('.')
logging.CyberCPLogFileWriter.writeToFile(f'Local DNS lookup successful for {ip_address}: {rdns_clean}')
return [rdns_clean]
except socket.herror as e:
logging.CyberCPLogFileWriter.writeToFile(f'Local DNS lookup failed for {ip_address}: {str(e)}')
return []
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error in local DNS lookup for {ip_address}: {str(e)}')
return []
except ImportError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to import requests library: {str(e)}')
return []
logging.CyberCPLogFileWriter.writeToFile(f'Failed to import requests library: {str(e)}. Attempting local DNS lookup.')
try:
import socket
rdns = socket.gethostbyaddr(ip_address)[0]
rdns_clean = rdns.rstrip('.')
logging.CyberCPLogFileWriter.writeToFile(f'Local DNS lookup successful for {ip_address}: {rdns_clean}')
return [rdns_clean]
except Exception as local_e:
logging.CyberCPLogFileWriter.writeToFile(f'Local DNS lookup also failed for {ip_address}: {str(local_e)}')
return []
except BaseException as e:
logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error in reverse_dns_lookup for IP {ip_address}: {str(e)}')
return []
logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error in reverse_dns_lookup for IP {ip_address}: {str(e)}. Attempting local DNS lookup.')
try:
import socket
rdns = socket.gethostbyaddr(ip_address)[0]
rdns_clean = rdns.rstrip('.')
logging.CyberCPLogFileWriter.writeToFile(f'Local DNS lookup successful for {ip_address}: {rdns_clean}')
return [rdns_clean]
except Exception as local_e:
logging.CyberCPLogFileWriter.writeToFile(f'Local DNS lookup also failed for {ip_address}: {str(local_e)}')
return []
@staticmethod
def SaveEmailLimitsNew(tempPath):