diff --git a/plinth/modules/dynamicdns/gnudip.py b/plinth/modules/dynamicdns/gnudip.py index 7b96eef2e..f176fc9cb 100644 --- a/plinth/modules/dynamicdns/gnudip.py +++ b/plinth/modules/dynamicdns/gnudip.py @@ -5,36 +5,78 @@ GnuDIP client for updating Dynamic DNS records. import hashlib import logging -import socket as socket_module +from html.parser import HTMLParser -BUF_SIZE = 128 -GNUDIP_PORT = 3495 -TIMEOUT = 10 +import requests logger = logging.getLogger(__name__) -def update(server, domain, username, password): - """Update Dynamic DNS record.""" +class MetaTagParser(HTMLParser): + """Extracts name and content from HTML meta tags as a dictionary.""" + + def __init__(self) -> None: + """Initialize the meta tags.""" + super().__init__() + self.meta: dict[str, str] = {} + + def handle_starttag(self, tag: str, + attrs: list[tuple[str, str | None]]) -> None: + """Handle encountering an opening HTML tag during parsing.""" + if tag.lower() == 'meta': + attr_dict = dict(attrs) + name = attr_dict.get('name') + content = attr_dict.get('content') + if name and content: + self.meta[name] = content + + +def _extract_content_from_meta_tags(html: str) -> dict[str, str]: + """Return a dict of {name: content} for all meta tags in the HTML.""" + parser = MetaTagParser() + parser.feed(html) + return parser.meta + + +def _check_required_keys(dictionary: dict[str, str], keys: list[str]) -> None: + missing_keys = [key for key in keys if key not in dictionary] + if missing_keys: + raise ValueError( + f"Missing required keys in response: {', '.join(missing_keys)}") + + +def update(server: str, domain: str, username: str, + password: str) -> tuple[str, str]: + """Update Dynamic DNS record using GnuDIP protocol. + + Protocol documentation: + https://gnudip2.sourceforge.net/gnudip-www/latest/gnudip/html/protocol.html + """ domain = domain.removeprefix(username + '.') password_digest = hashlib.md5(password.encode()).hexdigest() - with socket_module.socket(socket_module.AF_INET, - socket_module.SOCK_STREAM) as socket: - logger.debug('Connecting to %s:%d, timeout %ss', server, GNUDIP_PORT, - TIMEOUT) - socket.settimeout(TIMEOUT) - socket.connect((server, GNUDIP_PORT)) - salt = socket.recv(BUF_SIZE).decode().strip() - salted_digest = password_digest + '.' + salt - final_digest = hashlib.md5(salted_digest.encode()).hexdigest() + http_server = f'https://{server}/gnudip/cgi-bin/gdipupdt.cgi' + response = requests.get(http_server) - update_request = username + ':' + final_digest + ':' + domain + ':2\n' - socket.sendall(update_request.encode()) + salt_response = _extract_content_from_meta_tags(response.text) + _check_required_keys(salt_response, ['salt', 'time', 'sign']) - response = socket.recv(BUF_SIZE).decode().strip() - result, _, new_ip = response.partition(':') - result = (int(result) == 0) + salt = salt_response['salt'] + password_digest = hashlib.md5( + f'{password_digest}.{salt}'.encode()).hexdigest() - new_ip = new_ip if result else None - return result, new_ip + query_params = { + 'salt': salt, + 'time': salt_response['time'], + 'sign': salt_response['sign'], + 'user': username, + 'domn': domain, + 'pass': password_digest, + 'reqc': '2' + } + update_response = requests.get(http_server, params=query_params) + + update_result = _extract_content_from_meta_tags(update_response.text) + _check_required_keys(update_result, ['retc']) + result = (int(update_result['retc']) == 0) + return result, update_result.get('addr') diff --git a/plinth/modules/dynamicdns/tests/test_gnudip.py b/plinth/modules/dynamicdns/tests/test_gnudip.py new file mode 100644 index 000000000..c1ff29f20 --- /dev/null +++ b/plinth/modules/dynamicdns/tests/test_gnudip.py @@ -0,0 +1,76 @@ +from unittest.mock import Mock, patch + +import pytest + +from plinth.modules.dynamicdns import gnudip + +response_to_salt_request = """ + + + + + GnuDIP Update Server + + + + + + + +""" + +response_to_update_request = """ + + + + GnuDIP Update Server + + + + + + +""" + + +def test_parse_meta_tags(): + """Test parsing meta tags from HTML content.""" + expected = { + 'salt': 'gqEuCQYQWD', + 'time': '1746978203', + 'sign': '3d3b1c8ce32db470c6fd79a76f8dafb5' + } + actual = gnudip._extract_content_from_meta_tags(response_to_salt_request) + assert actual == expected + + +def test_check_required_keys_missing(): + """Test check_required_keys raises ValueError if key missing.""" + data = {'foo': 'bar'} + with pytest.raises(ValueError) as excinfo: + gnudip._check_required_keys(data, ['foo', 'baz']) + + assert "Missing required keys" in str(excinfo.value) + + +def test_check_required_keys_present(): + """Test check_required_keys with all keys present does not raise.""" + data = {'foo': 'bar', 'baz': 'qux'} + gnudip._check_required_keys(data, ['foo', 'baz']) + + +def test_update_success(): + """Test GNU DIP update mechanism with HTTP protocol.""" + salt_resp = Mock() + salt_resp.text = response_to_salt_request + update_resp = Mock() + update_resp.text = response_to_update_request + + with patch("plinth.modules.dynamicdns.gnudip.requests.get", + side_effect=[salt_resp, update_resp]) as mock_get: + result, addr = gnudip.update(server="http://www.2mbit.com:80", + domain="gnudip.dyn.mpis.net", + username="gnudip", password="password") + assert result + assert addr == "24.81.172.128" + assert mock_get.call_count == 2