140 lines
5.1 KiB
Python
140 lines
5.1 KiB
Python
register_module_line('DNSTest', 'start', __line__())
|
|
demisto.debug('pack name = DNSTest, pack version = 1.0.0')
|
|
|
|
import dns.resolver
|
|
from time import sleep
|
|
|
|
def get_dns_resolvers():
|
|
# This function retrieves the DNS resolvers from the server configuration.
|
|
resolver = dns.resolver.get_default_resolver()
|
|
return resolver.nameservers
|
|
|
|
def resolve_domain(domain, timeout=1, iterations=100, delay=0):
|
|
# This function resolves a given domain using all locally configured resolvers.
|
|
resolvers = get_dns_resolvers()
|
|
rtypes = ['A', 'AAAA', 'CNAME']
|
|
|
|
result = {}
|
|
for resolver in resolvers:
|
|
result.setdefault(resolver, {})
|
|
for rtype in rtypes:
|
|
demisto.debug(f'Resolving {domain} using {resolver} for {rtype} records...')
|
|
# run every query 100 times to handle cases where the DNS server is not responding.
|
|
errors = set()
|
|
result[resolver].setdefault(rtype, {'successes': 0, 'failures': 0, 'answers': set()})
|
|
demisto.debug(f'{result[resolver][rtype]}')
|
|
for i in range(iterations):
|
|
answers = False
|
|
try:
|
|
a = dns.resolver.resolve_at(resolver, domain, rtype, lifetime=timeout)
|
|
|
|
if a:
|
|
answers = True
|
|
for answer in a:
|
|
result[resolver][rtype]['answers'].add(str(answer))
|
|
|
|
except dns.resolver.NXDOMAIN:
|
|
error = f'Domain {domain} does not exist.'
|
|
errors.add(error)
|
|
demisto.info(error)
|
|
except dns.resolver.NoAnswer:
|
|
error = f'No {rtype} records found for domain {domain}.'
|
|
errors.add(error)
|
|
demisto.info(error)
|
|
except dns.resolver.Timeout:
|
|
error = f'Timeout occurred while resolving domain {domain}.'
|
|
errors.add(error)
|
|
demisto.info(error)
|
|
except Exception as e:
|
|
error = f'An error occurred while resolving domain {domain}: {str(e)}'
|
|
errors.add(error)
|
|
demisto.info(error)
|
|
|
|
demisto.info(f'Results for {resolver} and {rtype}: {answers}')
|
|
|
|
if answers:
|
|
demisto.debug(f'incrementing successes')
|
|
result[resolver][rtype]['successes'] += 1
|
|
#if result[resolver][rtype]['answers'] and answers != result[resolver][rtype]['answers']:
|
|
# demisto.info(f'got different answer for {rtype} from {resolver}: {answers}')
|
|
# result[resolver][rtype].setdefault('variants', []).append(answers)
|
|
else:
|
|
demisto.debug(f'incrementing failures')
|
|
result[resolver][rtype]['failures'] += 1
|
|
|
|
if delay > 0:
|
|
sleep(delay)
|
|
|
|
result[resolver][rtype]['answers'] = list(result[resolver][rtype]['answers'])
|
|
|
|
if errors:
|
|
demisto.debug(f'errors for {resolver} and {rtype}: {errors}')
|
|
result[resolver][rtype]['errors'] = list(errors)
|
|
|
|
return result
|
|
|
|
def test_module(default_domain, timeout):
|
|
"""
|
|
This is the test function that runs when the module is loaded for the first time.
|
|
It tests the module's functions by calling them with sample inputs.
|
|
"""
|
|
|
|
# Test resolve_domain function
|
|
domain = 'google.com'
|
|
result = resolve_domain(default_domain, timeout, iterations=1)
|
|
|
|
# Test get_dns_resolvers function
|
|
resolvers = get_dns_resolvers()
|
|
|
|
if isinstance(resolvers, list) and len(resolvers) > 0:
|
|
getresolversok = True
|
|
else:
|
|
return 'Get resolvers test failed'
|
|
|
|
if isinstance(result, dict) and len(result) > 0:
|
|
resolveok = True
|
|
else:
|
|
return 'Resolve test failed'
|
|
|
|
if resolveok and getresolversok:
|
|
return 'ok'
|
|
|
|
|
|
def main():
|
|
"""
|
|
Parse and Validate the commands, parameters, arguments
|
|
and processes the api requests.
|
|
"""
|
|
|
|
command = demisto.command()
|
|
demisto.info(f'Executing command {command}')
|
|
|
|
args = demisto.args()
|
|
params = demisto.params()
|
|
timeout = int(params.get('timeout', 1))
|
|
default_domain = params.get('default_domain', 'google.com')
|
|
# use command arguments if provided, then any custom default set, fallback to default delay
|
|
delay = int(args.get('delay', params.get('delay', 0)))
|
|
iterations = int(args.get('count', params.get('iterations', 100)))
|
|
|
|
if command == 'dns-test-resolve-domain':
|
|
domain = args.get('domain')
|
|
result = resolve_domain(domain, timeout, iterations=iterations, delay=delay)
|
|
return_results(result)
|
|
|
|
elif command == 'dns-test-get-resolvers':
|
|
resolvers = get_dns_resolvers()
|
|
return_results(resolvers)
|
|
|
|
elif command == 'test-module':
|
|
result = test_module(default_domain, timeout)
|
|
return_results(result)
|
|
|
|
else:
|
|
return_error(f'Unsupported command: {command}')
|
|
|
|
if __name__ in ['__main__', '__builtin__', 'builtins']:
|
|
main()
|
|
|
|
register_module_line('DNSTest', 'end', __line__())
|