mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-05-27 10:44:33 +00:00
dynamicdns: Switch update client to HTTP protocol
Fixes: #2520 Signed-off-by: Joseph Nuthalapati <njoseph@riseup.net> [sunil: Use params= argument instead of incorrectly contacting query params] [sunil: Recognize error responses properly] [sunil: Minor styling fixes] Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
This commit is contained in:
parent
9fa1e18aa3
commit
8d98345e2d
@ -5,36 +5,78 @@ GnuDIP client for updating Dynamic DNS records.
|
|||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
import socket as socket_module
|
from html.parser import HTMLParser
|
||||||
|
|
||||||
BUF_SIZE = 128
|
import requests
|
||||||
GNUDIP_PORT = 3495
|
|
||||||
TIMEOUT = 10
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def update(server, domain, username, password):
|
class MetaTagParser(HTMLParser):
|
||||||
"""Update Dynamic DNS record."""
|
"""Extracts name and content from HTML meta tags as a dictionary."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""Initialize the meta tags."""
|
||||||
|
super().__init__()
|
||||||
|
self.meta: dict[str, str] = {}
|
||||||
|
|
||||||
|
def handle_starttag(self, tag: str,
|
||||||
|
attrs: list[tuple[str, str | None]]) -> None:
|
||||||
|
"""Handle encountering an opening HTML tag during parsing."""
|
||||||
|
if tag.lower() == 'meta':
|
||||||
|
attr_dict = dict(attrs)
|
||||||
|
name = attr_dict.get('name')
|
||||||
|
content = attr_dict.get('content')
|
||||||
|
if name and content:
|
||||||
|
self.meta[name] = content
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_content_from_meta_tags(html: str) -> dict[str, str]:
|
||||||
|
"""Return a dict of {name: content} for all meta tags in the HTML."""
|
||||||
|
parser = MetaTagParser()
|
||||||
|
parser.feed(html)
|
||||||
|
return parser.meta
|
||||||
|
|
||||||
|
|
||||||
|
def _check_required_keys(dictionary: dict[str, str], keys: list[str]) -> None:
|
||||||
|
missing_keys = [key for key in keys if key not in dictionary]
|
||||||
|
if missing_keys:
|
||||||
|
raise ValueError(
|
||||||
|
f"Missing required keys in response: {', '.join(missing_keys)}")
|
||||||
|
|
||||||
|
|
||||||
|
def update(server: str, domain: str, username: str,
|
||||||
|
password: str) -> tuple[str, str]:
|
||||||
|
"""Update Dynamic DNS record using GnuDIP protocol.
|
||||||
|
|
||||||
|
Protocol documentation:
|
||||||
|
https://gnudip2.sourceforge.net/gnudip-www/latest/gnudip/html/protocol.html
|
||||||
|
"""
|
||||||
domain = domain.removeprefix(username + '.')
|
domain = domain.removeprefix(username + '.')
|
||||||
password_digest = hashlib.md5(password.encode()).hexdigest()
|
password_digest = hashlib.md5(password.encode()).hexdigest()
|
||||||
|
|
||||||
with socket_module.socket(socket_module.AF_INET,
|
http_server = f'https://{server}/gnudip/cgi-bin/gdipupdt.cgi'
|
||||||
socket_module.SOCK_STREAM) as socket:
|
response = requests.get(http_server)
|
||||||
logger.debug('Connecting to %s:%d, timeout %ss', server, GNUDIP_PORT,
|
|
||||||
TIMEOUT)
|
|
||||||
socket.settimeout(TIMEOUT)
|
|
||||||
socket.connect((server, GNUDIP_PORT))
|
|
||||||
salt = socket.recv(BUF_SIZE).decode().strip()
|
|
||||||
salted_digest = password_digest + '.' + salt
|
|
||||||
final_digest = hashlib.md5(salted_digest.encode()).hexdigest()
|
|
||||||
|
|
||||||
update_request = username + ':' + final_digest + ':' + domain + ':2\n'
|
salt_response = _extract_content_from_meta_tags(response.text)
|
||||||
socket.sendall(update_request.encode())
|
_check_required_keys(salt_response, ['salt', 'time', 'sign'])
|
||||||
|
|
||||||
response = socket.recv(BUF_SIZE).decode().strip()
|
salt = salt_response['salt']
|
||||||
result, _, new_ip = response.partition(':')
|
password_digest = hashlib.md5(
|
||||||
result = (int(result) == 0)
|
f'{password_digest}.{salt}'.encode()).hexdigest()
|
||||||
|
|
||||||
new_ip = new_ip if result else None
|
query_params = {
|
||||||
return result, new_ip
|
'salt': salt,
|
||||||
|
'time': salt_response['time'],
|
||||||
|
'sign': salt_response['sign'],
|
||||||
|
'user': username,
|
||||||
|
'domn': domain,
|
||||||
|
'pass': password_digest,
|
||||||
|
'reqc': '2'
|
||||||
|
}
|
||||||
|
update_response = requests.get(http_server, params=query_params)
|
||||||
|
|
||||||
|
update_result = _extract_content_from_meta_tags(update_response.text)
|
||||||
|
_check_required_keys(update_result, ['retc'])
|
||||||
|
result = (int(update_result['retc']) == 0)
|
||||||
|
return result, update_result.get('addr')
|
||||||
|
|||||||
76
plinth/modules/dynamicdns/tests/test_gnudip.py
Normal file
76
plinth/modules/dynamicdns/tests/test_gnudip.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from plinth.modules.dynamicdns import gnudip
|
||||||
|
|
||||||
|
response_to_salt_request = """
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<meta http-equiv="Content-Type" content="text/html; charset=utf8">
|
||||||
|
<title>
|
||||||
|
GnuDIP Update Server
|
||||||
|
</title>
|
||||||
|
<meta name="salt" content="gqEuCQYQWD">
|
||||||
|
<meta name="time" content="1746978203">
|
||||||
|
<meta name="sign" content="3d3b1c8ce32db470c6fd79a76f8dafb5">
|
||||||
|
</head>
|
||||||
|
<body></body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
response_to_update_request = """
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<title>
|
||||||
|
GnuDIP Update Server
|
||||||
|
</title>
|
||||||
|
<meta name="retc" content="0">
|
||||||
|
<meta name="addr" content="24.81.172.128">
|
||||||
|
</head>
|
||||||
|
<body></body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_meta_tags():
|
||||||
|
"""Test parsing meta tags from HTML content."""
|
||||||
|
expected = {
|
||||||
|
'salt': 'gqEuCQYQWD',
|
||||||
|
'time': '1746978203',
|
||||||
|
'sign': '3d3b1c8ce32db470c6fd79a76f8dafb5'
|
||||||
|
}
|
||||||
|
actual = gnudip._extract_content_from_meta_tags(response_to_salt_request)
|
||||||
|
assert actual == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_required_keys_missing():
|
||||||
|
"""Test check_required_keys raises ValueError if key missing."""
|
||||||
|
data = {'foo': 'bar'}
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
gnudip._check_required_keys(data, ['foo', 'baz'])
|
||||||
|
|
||||||
|
assert "Missing required keys" in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_required_keys_present():
|
||||||
|
"""Test check_required_keys with all keys present does not raise."""
|
||||||
|
data = {'foo': 'bar', 'baz': 'qux'}
|
||||||
|
gnudip._check_required_keys(data, ['foo', 'baz'])
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_success():
|
||||||
|
"""Test GNU DIP update mechanism with HTTP protocol."""
|
||||||
|
salt_resp = Mock()
|
||||||
|
salt_resp.text = response_to_salt_request
|
||||||
|
update_resp = Mock()
|
||||||
|
update_resp.text = response_to_update_request
|
||||||
|
|
||||||
|
with patch("plinth.modules.dynamicdns.gnudip.requests.get",
|
||||||
|
side_effect=[salt_resp, update_resp]) as mock_get:
|
||||||
|
result, addr = gnudip.update(server="http://www.2mbit.com:80",
|
||||||
|
domain="gnudip.dyn.mpis.net",
|
||||||
|
username="gnudip", password="password")
|
||||||
|
assert result
|
||||||
|
assert addr == "24.81.172.128"
|
||||||
|
assert mock_get.call_count == 2
|
||||||
Loading…
x
Reference in New Issue
Block a user