diff --git a/plinth/db/dbconfig.py b/plinth/db/dbconfig.py new file mode 100644 index 000000000..b3e31125b --- /dev/null +++ b/plinth/db/dbconfig.py @@ -0,0 +1,40 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Utilities for parsing dbconfig-common files with Augeas.""" + +import pathlib + +import augeas + + +def get_credentials(dbconfig_path: str) -> dict[str, str]: + """Parse dbconfig-common file with Augeas Shellvars lens.""" + if not pathlib.Path(dbconfig_path).is_file(): + raise FileNotFoundError(f'DB config not found: {dbconfig_path}') + + aug = _load_augeas(dbconfig_path) + + required = ['dbc_dbuser', 'dbc_dbpass', 'dbc_dbname'] + credentials = {} + for key in required + ['dbc_dbserver']: + credentials[key] = aug.get(key).strip('\'"') + + if not all(credentials.get(key) for key in required): + raise ValueError('Missing required dbconfig-common credentials') + + return { + 'user': credentials['dbc_dbuser'], + 'password': credentials['dbc_dbpass'], + 'database': credentials['dbc_dbname'], + 'host': credentials['dbc_dbserver'] or 'localhost' + } + + +def _load_augeas(config_path: str): + """Initialize Augeas.""" + aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD + + augeas.Augeas.NO_MODL_AUTOLOAD) + pathstr = str(config_path) + aug.transform('Shellvars', pathstr) + aug.set('/augeas/context', f'/files{pathstr}') + aug.load() + return aug diff --git a/plinth/tests/test_db.py b/plinth/tests/test_db.py index 470ce5688..0e7d46068 100644 --- a/plinth/tests/test_db.py +++ b/plinth/tests/test_db.py @@ -6,6 +6,7 @@ import threading import time from .. import db +from ..db import dbconfig def test_db_lock_no_wait(): @@ -66,3 +67,22 @@ def test_db_lock_release(): end_time = time.time() assert return_value assert end_time - start_time <= 0.23 + + +def test_dbconfig_get_credentials(tmp_path): + """Test that parsing a dbconfig-common file works.""" + file_path = tmp_path / 'test.conf' + configuration = ''' +dbc_dbserver='localhost' +dbc_dbname='miniflux' +dbc_dbuser='miniflux' +dbc_dbpass='gCcNyWjyPjDH' +''' + file_path.write_text(configuration) + credentials = dbconfig.get_credentials(file_path) + assert credentials == { + 'host': 'localhost', + 'database': 'miniflux', + 'user': 'miniflux', + 'password': 'gCcNyWjyPjDH', + }