mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
*: pylint: Explicitly specify encoding when open a file
This is recommended by PEP-0597: https://peps.python.org/pep-0597/ Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
05815bc992
commit
3c7bc4a192
@ -51,12 +51,12 @@ def subcommand_create_key_pair(_):
|
||||
pkey = crypto.PKey()
|
||||
pkey.generate_key(crypto.TYPE_RSA, 4096)
|
||||
|
||||
with open(private_key_file, 'w') as priv_key_file:
|
||||
with open(private_key_file, 'w', encoding='utf-8') as priv_key_file:
|
||||
priv_key = crypto.dump_privatekey(crypto.FILETYPE_PEM,
|
||||
pkey).decode()
|
||||
priv_key_file.write(priv_key)
|
||||
|
||||
with open(public_key_file, 'w') as pub_key_file:
|
||||
with open(public_key_file, 'w', encoding='utf-8') as pub_key_file:
|
||||
pub_key = crypto.dump_publickey(crypto.FILETYPE_PEM, pkey).decode()
|
||||
pub_key_file.write(pub_key)
|
||||
|
||||
@ -93,7 +93,7 @@ def subcommand_generate_ticket(arguments):
|
||||
uid = arguments.uid
|
||||
private_key_file = arguments.private_key_file
|
||||
tokens = arguments.tokens
|
||||
with open(private_key_file, 'r') as fil:
|
||||
with open(private_key_file, 'r', encoding='utf-8') as fil:
|
||||
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, fil.read().encode())
|
||||
valid_until = minutes_from_now(12 * 60)
|
||||
grace_period = minutes_from_now(11 * 60)
|
||||
|
||||
@ -34,7 +34,7 @@ def parse_arguments():
|
||||
def subcommand_setup(arguments):
|
||||
"""Setup BIND configuration."""
|
||||
if arguments.old_version == 0:
|
||||
with open(CONFIG_FILE, "w") as conf_file:
|
||||
with open(CONFIG_FILE, 'w', encoding='utf-8') as conf_file:
|
||||
conf_file.write(DEFAULT_CONFIG)
|
||||
|
||||
Path(ZONES_DIR).mkdir(exist_ok=True, parents=True)
|
||||
|
||||
@ -40,7 +40,7 @@ def subcommand_set_home_page(arguments):
|
||||
|
||||
redirect_rule = 'RedirectMatch "^/$" "{}"\n'.format(arguments.homepage)
|
||||
|
||||
with open(conf_file_path, 'w') as conf_file:
|
||||
with open(conf_file_path, 'w', encoding='utf-8') as conf_file:
|
||||
conf_file.write(redirect_rule)
|
||||
|
||||
action_utils.webserver_enable('freedombox-apache-homepage')
|
||||
|
||||
@ -135,7 +135,8 @@ def subcommand_set_configuration(arguments):
|
||||
def subcommand_setup(_):
|
||||
"""Perform initial setup for deluge."""
|
||||
|
||||
with open(DELUGE_WEB_SYSTEMD_SERVICE_PATH, 'w') as file_handle:
|
||||
with open(DELUGE_WEB_SYSTEMD_SERVICE_PATH, 'w',
|
||||
encoding='utf-8') as file_handle:
|
||||
file_handle.write(DELUGE_WEB_SYSTEMD_SERVICE)
|
||||
|
||||
_set_deluged_daemon_options()
|
||||
|
||||
@ -107,7 +107,7 @@ def parse_arguments():
|
||||
|
||||
def subcommand_get_configuration(_):
|
||||
"""Return the current configuration, specifically domains configured."""
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
print(json.dumps({'domains': conf['hosts']}))
|
||||
@ -126,7 +126,7 @@ def subcommand_pre_install(arguments):
|
||||
|
||||
def subcommand_setup(arguments):
|
||||
"""Enabled LDAP authentication"""
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
for listen_port in conf['listen']:
|
||||
@ -140,7 +140,7 @@ def subcommand_setup(arguments):
|
||||
conf['ldap_base'] = scalarstring.DoubleQuotedScalarString(
|
||||
'ou=users,dc=thisbox')
|
||||
|
||||
with open(EJABBERD_CONFIG, 'w') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'w', encoding='utf-8') as file_handle:
|
||||
yaml.dump(conf, file_handle)
|
||||
|
||||
upgrade_config(arguments.domainname)
|
||||
@ -157,7 +157,7 @@ def upgrade_config(domain):
|
||||
if not current_version:
|
||||
print('Warning: Unable to get ejabberd version.')
|
||||
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
# Check if `iqdisc` is present and remove it
|
||||
@ -190,7 +190,7 @@ def upgrade_config(domain):
|
||||
listen_port['certfile'] = cert_file
|
||||
|
||||
# Write changes back to the file
|
||||
with open(EJABBERD_CONFIG, 'w') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'w', encoding='utf-8') as file_handle:
|
||||
yaml.dump(conf, file_handle)
|
||||
|
||||
|
||||
@ -250,7 +250,7 @@ def subcommand_get_domains(_):
|
||||
print('ejabberdctl not found. Is ejabberd installed?')
|
||||
return
|
||||
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
print(json.dumps(conf['hosts']))
|
||||
@ -268,14 +268,14 @@ def subcommand_add_domain(arguments):
|
||||
domainname = arguments.domainname
|
||||
|
||||
# Add updated domainname to ejabberd hosts list.
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
conf['hosts'].append(scalarstring.DoubleQuotedScalarString(domainname))
|
||||
|
||||
conf['hosts'] = list(set(conf['hosts']))
|
||||
|
||||
with open(EJABBERD_CONFIG, 'w') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'w', encoding='utf-8') as file_handle:
|
||||
yaml.dump(conf, file_handle)
|
||||
|
||||
# Restarting ejabberd is handled by letsencrypt-ejabberd component.
|
||||
@ -290,19 +290,19 @@ def subcommand_set_domains(arguments):
|
||||
print('ejabberdctl not found. Is ejabberd installed?')
|
||||
return
|
||||
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
conf['hosts'] = arguments.domains
|
||||
|
||||
with open(EJABBERD_CONFIG, 'w') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'w', encoding='utf-8') as file_handle:
|
||||
yaml.dump(conf, file_handle)
|
||||
|
||||
|
||||
def subcommand_mam(argument):
|
||||
"""Enable, disable, or get status of Message Archive Management (MAM)."""
|
||||
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
if 'modules' not in conf:
|
||||
@ -340,7 +340,7 @@ def subcommand_mam(argument):
|
||||
print("Unknown command: %s" % argument.command)
|
||||
return
|
||||
|
||||
with open(EJABBERD_CONFIG, 'w') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'w', encoding='utf-8') as file_handle:
|
||||
yaml.dump(conf, file_handle)
|
||||
|
||||
if action_utils.service_is_running('ejabberd'):
|
||||
@ -370,7 +370,7 @@ def _generate_uris(services: list[dict]) -> list[str]:
|
||||
|
||||
def subcommand_get_turn_config(_):
|
||||
"""Get the latest STUN/TURN configuration in JSON format."""
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
mod_stun_disco_config = conf['modules']['mod_stun_disco']
|
||||
@ -405,12 +405,12 @@ def subcommand_configure_turn(arguments):
|
||||
'services': [_generate_service(uri) for uri in uris]
|
||||
}
|
||||
|
||||
with open(EJABBERD_CONFIG, 'r') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'r', encoding='utf-8') as file_handle:
|
||||
conf = yaml.load(file_handle)
|
||||
|
||||
conf['modules']['mod_stun_disco'] = mod_stun_disco_config
|
||||
|
||||
with open(EJABBERD_CONFIG, 'w') as file_handle:
|
||||
with open(EJABBERD_CONFIG, 'w', encoding='utf-8') as file_handle:
|
||||
yaml.dump(conf, file_handle)
|
||||
|
||||
if arguments.managed:
|
||||
|
||||
@ -172,7 +172,8 @@ def _clone_with_progress_report(url, repo_dir):
|
||||
elapsed = _clone_status_line_to_percent(line)
|
||||
if elapsed is not None:
|
||||
try:
|
||||
with open(status_file, 'w') as file_handle:
|
||||
with open(status_file, 'w',
|
||||
encoding='utf-8') as file_handle:
|
||||
file_handle.write(elapsed)
|
||||
except OSError as error:
|
||||
errors.append(str(error))
|
||||
@ -205,7 +206,7 @@ def _prepare_clone_repo(arguments):
|
||||
try:
|
||||
if arguments.is_private:
|
||||
_set_access_status(repo_name, 'private')
|
||||
with open(status_file, 'w') as file_handle:
|
||||
with open(status_file, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write('0')
|
||||
except OSError:
|
||||
shutil.rmtree(repo_dir)
|
||||
@ -289,7 +290,7 @@ def _get_repo_description(repo):
|
||||
"""Set description of the repository."""
|
||||
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
|
||||
if os.path.exists(description_file):
|
||||
with open(description_file, 'r') as file_handle:
|
||||
with open(description_file, 'r', encoding='utf-8') as file_handle:
|
||||
description = file_handle.read()
|
||||
else:
|
||||
description = ''
|
||||
@ -300,7 +301,7 @@ def _get_repo_description(repo):
|
||||
def _set_repo_description(repo, description):
|
||||
"""Set description of the repository."""
|
||||
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
|
||||
with open(description_file, 'w') as file_handle:
|
||||
with open(description_file, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(description)
|
||||
|
||||
|
||||
@ -326,7 +327,7 @@ def _set_repo_owner(repo, owner):
|
||||
config.add_section('gitweb')
|
||||
|
||||
config['gitweb']['owner'] = owner
|
||||
with open(repo_config, 'w') as file_handle:
|
||||
with open(repo_config, 'w', encoding='utf-8') as file_handle:
|
||||
config.write(file_handle)
|
||||
|
||||
|
||||
@ -343,7 +344,7 @@ def _set_access_status(repo, status):
|
||||
"""Set repository as private or public"""
|
||||
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
|
||||
if status == 'private':
|
||||
open(private_file, 'a')
|
||||
open(private_file, 'a', encoding='utf-8')
|
||||
elif status == 'public':
|
||||
if os.path.exists(private_file):
|
||||
os.remove(private_file)
|
||||
|
||||
@ -59,7 +59,8 @@ def subcommand_setup(_):
|
||||
def get_title(site):
|
||||
"""Get blog or wiki title"""
|
||||
try:
|
||||
with open(os.path.join(SITE_PATH, site, 'index.html')) as index_file:
|
||||
with open(os.path.join(SITE_PATH, site, 'index.html'),
|
||||
encoding='utf-8') as index_file:
|
||||
match = re.search(r'<title>(.*)</title>', index_file.read())
|
||||
if match:
|
||||
return match[1]
|
||||
|
||||
@ -130,10 +130,10 @@ def _kill_daemon():
|
||||
def subcommand_setup(_):
|
||||
"""Configure infinoted after install."""
|
||||
if not os.path.isfile(CONF_PATH):
|
||||
with open(CONF_PATH, 'w') as file_handle:
|
||||
with open(CONF_PATH, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(CONF)
|
||||
|
||||
with open(SYSTEMD_SERVICE_PATH, 'w') as file_handle:
|
||||
with open(SYSTEMD_SERVICE_PATH, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(SYSTEMD_SERVICE)
|
||||
|
||||
subprocess.check_call(['systemctl', 'daemon-reload'])
|
||||
|
||||
@ -22,10 +22,10 @@ def parse_arguments():
|
||||
|
||||
def subcommand_setup(_):
|
||||
"""Configure Janus server."""
|
||||
with open(JANUS_CONF_PATH, 'r') as config_file:
|
||||
with open(JANUS_CONF_PATH, 'r', encoding='utf-8') as config_file:
|
||||
config_lines = config_file.readlines()
|
||||
|
||||
with open(JANUS_CONF_PATH, 'w') as config_file:
|
||||
with open(JANUS_CONF_PATH, 'w', encoding='utf-8') as config_file:
|
||||
for line in config_lines:
|
||||
if '#rtp_port_range' in line:
|
||||
config_file.write("\trtp_port_range = \"50176-51199\"\n")
|
||||
|
||||
@ -473,7 +473,7 @@ def setup_webserver_config(domain, webserver_change):
|
||||
if os.path.isfile(file_name):
|
||||
os.rename(file_name, file_name + '.fbx-bak')
|
||||
|
||||
with open(file_name, 'w') as file_handle:
|
||||
with open(file_name, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(APACHE_CONFIGURATION.format(domain=domain))
|
||||
|
||||
webserver_change.enable('freedombox-tls-site-macro', kind='config')
|
||||
|
||||
@ -73,11 +73,11 @@ def parse_arguments():
|
||||
|
||||
def subcommand_post_install(_):
|
||||
"""Perform post installation configuration."""
|
||||
with open(STATIC_CONF_PATH, 'w') as static_conf_file:
|
||||
with open(STATIC_CONF_PATH, 'w', encoding='utf-8') as static_conf_file:
|
||||
yaml.dump(STATIC_CONFIG, static_conf_file)
|
||||
|
||||
# start with listener config from original homeserver.yaml
|
||||
with open(ORIG_CONF_PATH) as orig_conf_file:
|
||||
with open(ORIG_CONF_PATH, encoding='utf-8') as orig_conf_file:
|
||||
orig_config = yaml.load(orig_conf_file)
|
||||
|
||||
listeners = orig_config['listeners']
|
||||
@ -86,7 +86,8 @@ def subcommand_post_install(_):
|
||||
listener['bind_addresses'] = ['::', '0.0.0.0']
|
||||
listener.pop('bind_address', None)
|
||||
|
||||
with open(LISTENERS_CONF_PATH, 'w') as listeners_conf_file:
|
||||
with open(LISTENERS_CONF_PATH, 'w',
|
||||
encoding='utf-8') as listeners_conf_file:
|
||||
yaml.dump({'listeners': listeners}, listeners_conf_file)
|
||||
|
||||
|
||||
@ -100,11 +101,11 @@ def subcommand_setup(arguments):
|
||||
def subcommand_public_registration(argument):
|
||||
"""Enable/Disable/Status public user registration."""
|
||||
try:
|
||||
with open(REGISTRATION_CONF_PATH) as reg_conf_file:
|
||||
with open(REGISTRATION_CONF_PATH, encoding='utf-8') as reg_conf_file:
|
||||
config = yaml.load(reg_conf_file)
|
||||
except FileNotFoundError:
|
||||
# Check if its set in original conffile.
|
||||
with open(ORIG_CONF_PATH) as orig_conf_file:
|
||||
with open(ORIG_CONF_PATH, encoding='utf-8') as orig_conf_file:
|
||||
orig_config = yaml.load(orig_conf_file)
|
||||
config = {
|
||||
'enable_registration':
|
||||
@ -123,7 +124,7 @@ def subcommand_public_registration(argument):
|
||||
elif argument.command == 'disable':
|
||||
config['enable_registration'] = False
|
||||
|
||||
with open(REGISTRATION_CONF_PATH, 'w') as reg_conf_file:
|
||||
with open(REGISTRATION_CONF_PATH, 'w', encoding='utf-8') as reg_conf_file:
|
||||
yaml.dump(config, reg_conf_file)
|
||||
|
||||
action_utils.service_try_restart('matrix-synapse')
|
||||
@ -156,7 +157,7 @@ def _set_turn_config(conf_file):
|
||||
'turn_allow_guests': True
|
||||
}
|
||||
|
||||
with open(conf_file, 'w+') as turn_config:
|
||||
with open(conf_file, 'w+', encoding='utf-8') as turn_config:
|
||||
yaml.dump(config, turn_config)
|
||||
|
||||
|
||||
|
||||
@ -108,7 +108,7 @@ def subcommand_setup(_):
|
||||
|
||||
def include_custom_config():
|
||||
"""Include FreedomBox specific configuration in LocalSettings.php."""
|
||||
with open(LOCAL_SETTINGS_CONF, 'r') as conf_file:
|
||||
with open(LOCAL_SETTINGS_CONF, 'r', encoding='utf-8') as conf_file:
|
||||
lines = conf_file.readlines()
|
||||
|
||||
static_settings_index = None
|
||||
@ -129,7 +129,7 @@ def include_custom_config():
|
||||
settings_index,
|
||||
'include dirname(__FILE__)."/FreedomBoxStaticSettings.php";\n')
|
||||
|
||||
with open(LOCAL_SETTINGS_CONF, 'w') as conf_file:
|
||||
with open(LOCAL_SETTINGS_CONF, 'w', encoding='utf-8') as conf_file:
|
||||
conf_file.writelines(lines)
|
||||
|
||||
|
||||
@ -154,7 +154,7 @@ def subcommand_update(_):
|
||||
def subcommand_public_registrations(arguments):
|
||||
"""Enable or Disable public registrations for MediaWiki."""
|
||||
|
||||
with open(CONF_FILE, 'r') as conf_file:
|
||||
with open(CONF_FILE, 'r', encoding='utf-8') as conf_file:
|
||||
lines = conf_file.readlines()
|
||||
|
||||
def is_pub_reg_line(line):
|
||||
@ -167,7 +167,7 @@ def subcommand_public_registrations(arguments):
|
||||
else:
|
||||
print('disabled')
|
||||
else:
|
||||
with open(CONF_FILE, 'w') as conf_file:
|
||||
with open(CONF_FILE, 'w', encoding='utf-8') as conf_file:
|
||||
for line in lines:
|
||||
if is_pub_reg_line(line):
|
||||
words = line.split()
|
||||
@ -182,7 +182,7 @@ def subcommand_public_registrations(arguments):
|
||||
|
||||
def subcommand_private_mode(arguments):
|
||||
"""Enable or Disable Private mode for wiki"""
|
||||
with open(CONF_FILE, 'r') as conf_file:
|
||||
with open(CONF_FILE, 'r', encoding='utf-8') as conf_file:
|
||||
lines = conf_file.readlines()
|
||||
|
||||
def is_read_line(line):
|
||||
@ -195,7 +195,7 @@ def subcommand_private_mode(arguments):
|
||||
else:
|
||||
print('disabled')
|
||||
else:
|
||||
with open(CONF_FILE, 'w') as conf_file:
|
||||
with open(CONF_FILE, 'w', encoding='utf-8') as conf_file:
|
||||
conf_value = 'false;' if arguments.command == 'enable' else 'true;'
|
||||
for line in lines:
|
||||
if is_read_line(line):
|
||||
@ -212,7 +212,7 @@ def subcommand_private_mode(arguments):
|
||||
|
||||
def _update_setting(setting_name, setting_line):
|
||||
"""Update the value of one setting in the config file."""
|
||||
with open(CONF_FILE, 'r') as conf_file:
|
||||
with open(CONF_FILE, 'r', encoding='utf-8') as conf_file:
|
||||
lines = conf_file.readlines()
|
||||
|
||||
inserted = False
|
||||
@ -225,7 +225,7 @@ def _update_setting(setting_name, setting_line):
|
||||
if not inserted:
|
||||
lines.append(setting_line)
|
||||
|
||||
with open(CONF_FILE, 'w') as conf_file:
|
||||
with open(CONF_FILE, 'w', encoding='utf-8') as conf_file:
|
||||
conf_file.writelines(lines)
|
||||
|
||||
|
||||
|
||||
@ -65,7 +65,8 @@ def subcommand_setup(arguments):
|
||||
monitor changes in large media-dirs.
|
||||
"""
|
||||
_undo_old_configuration_changes()
|
||||
with open('/etc/sysctl.d/50-freedombox-minidlna.conf', 'w') as conf:
|
||||
with open('/etc/sysctl.d/50-freedombox-minidlna.conf', 'w',
|
||||
encoding='utf-8') as conf:
|
||||
conf.write(SYSCTL_CONF)
|
||||
|
||||
subprocess.run(['systemctl', 'restart', 'systemd-sysctl'], check=True)
|
||||
@ -97,7 +98,7 @@ def replace_in_config_file(file_path, pattern, subst):
|
||||
"""
|
||||
temp_file, temp_file_path = mkstemp()
|
||||
with fdopen(temp_file, 'w') as new_file:
|
||||
with open(file_path) as old_file:
|
||||
with open(file_path, encoding='utf-8') as old_file:
|
||||
for line in old_file:
|
||||
new_file.write(line.replace(pattern, subst))
|
||||
|
||||
|
||||
@ -141,7 +141,8 @@ def parse_arguments():
|
||||
def _is_using_ecc():
|
||||
"""Return whether the service is using ECC."""
|
||||
if os.path.exists(SERVER_CONFIGURATION_PATH):
|
||||
with open(SERVER_CONFIGURATION_PATH, 'r') as file_handle:
|
||||
with open(SERVER_CONFIGURATION_PATH, 'r',
|
||||
encoding='utf-8') as file_handle:
|
||||
for line in file_handle:
|
||||
if line.strip() == 'dh none':
|
||||
return True
|
||||
@ -169,7 +170,7 @@ def subcommand_setup(_):
|
||||
|
||||
def _write_server_config():
|
||||
"""Write server configuration."""
|
||||
with open(SERVER_CONFIGURATION_PATH, 'w') as file_handle:
|
||||
with open(SERVER_CONFIGURATION_PATH, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(SERVER_CONFIGURATION)
|
||||
|
||||
|
||||
@ -280,7 +281,7 @@ def set_unique_subject(value):
|
||||
|
||||
def _read_file(filename):
|
||||
"""Return the entire contents of a file as string."""
|
||||
with open(filename, 'r') as file_handle:
|
||||
with open(filename, 'r', encoding='utf-8') as file_handle:
|
||||
return ''.join(file_handle.readlines())
|
||||
|
||||
|
||||
|
||||
@ -118,7 +118,7 @@ def _assert_managed_packages(module, packages):
|
||||
cfg.read()
|
||||
module_file = os.path.join(cfg.config_dir, 'modules-enabled', module)
|
||||
|
||||
with open(module_file, 'r') as file_handle:
|
||||
with open(module_file, 'r', encoding='utf-8') as file_handle:
|
||||
module_path = file_handle.read().strip()
|
||||
|
||||
module = import_module(module_path)
|
||||
|
||||
@ -156,7 +156,7 @@ def subcommand_remove_service(arguments):
|
||||
for path in paths:
|
||||
filepath = _convert_augeas_path_to_filepath(path)
|
||||
service_found = False
|
||||
with open(filepath, 'r') as file:
|
||||
with open(filepath, 'r', encoding='utf-8') as file:
|
||||
lines = file.readlines()
|
||||
for i, line in enumerate(lines):
|
||||
if line.startswith('service_on') and \
|
||||
@ -165,7 +165,7 @@ def subcommand_remove_service(arguments):
|
||||
service_found = True
|
||||
break
|
||||
if service_found:
|
||||
with open(filepath, 'w') as file:
|
||||
with open(filepath, 'w', encoding='utf-8') as file:
|
||||
file.writelines(lines)
|
||||
# abort to only allow deleting one service
|
||||
break
|
||||
@ -192,7 +192,7 @@ def _add_service(service):
|
||||
# TODO: after adding a service, augeas fails writing the config;
|
||||
# so add the service_on entry manually instead
|
||||
path = _convert_augeas_path_to_filepath(root)
|
||||
with open(path, 'a') as servicefile:
|
||||
with open(path, 'a', encoding='utf-8') as servicefile:
|
||||
line = "\nservice_on = %s\n" % utils.convert_service_to_string(service)
|
||||
servicefile.write(line)
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ def parse_arguments():
|
||||
def subcommand_set_domain(arguments):
|
||||
"""Write a file containing domain name."""
|
||||
domain_file = pathlib.Path('/var/lib/quassel/domain-freedombox')
|
||||
domain_file.write_text(arguments.domain_name)
|
||||
domain_file.write_text(arguments.domain_name, encoding='utf-8')
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@ -42,14 +42,14 @@ def subcommand_pre_install(_):
|
||||
def subcommand_setup(_):
|
||||
"""Add FreedomBox configuration and include from main configuration."""
|
||||
if not _config_file.exists():
|
||||
_config_file.write_text('<?php\n')
|
||||
_config_file.write_text('<?php\n', encoding='utf-8')
|
||||
|
||||
base_config = pathlib.Path('/etc/roundcube/config.inc.php')
|
||||
lines = base_config.read_text().splitlines()
|
||||
lines = base_config.read_text(encoding='utf-8').splitlines()
|
||||
exists = any((str(_config_file) in line for line in lines))
|
||||
if not exists:
|
||||
lines.append(f'include_once("{_config_file}");\n')
|
||||
base_config.write_text('\n'.join(lines))
|
||||
base_config.write_text('\n'.join(lines), encoding='utf-8')
|
||||
|
||||
|
||||
def subcommand_get_config(_):
|
||||
@ -57,7 +57,7 @@ def subcommand_get_config(_):
|
||||
pattern = r'\s*\$config\[\s*\'([^\']*)\'\s*\]\s*=\s*\'([^\']*)\'\s*;'
|
||||
_config = {}
|
||||
try:
|
||||
for line in _config_file.read_text().splitlines():
|
||||
for line in _config_file.read_text(encoding='utf-8').splitlines():
|
||||
match = re.match(pattern, line)
|
||||
if match:
|
||||
_config[match.group(1)] = match.group(2)
|
||||
@ -81,7 +81,7 @@ $config['smtp_port'] = 25;
|
||||
$config['smtp_helo_host'] = 'localhost';
|
||||
'''
|
||||
|
||||
_config_file.write_text(config)
|
||||
_config_file.write_text(config, encoding='utf-8')
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@ -237,8 +237,8 @@ def _set_open_share_permissions(directory):
|
||||
def _use_config_file(conf_file):
|
||||
"""Set samba configuration file location."""
|
||||
import augeas
|
||||
aug = augeas.Augeas(
|
||||
flags=augeas.Augeas.NO_LOAD + augeas.Augeas.NO_MODL_AUTOLOAD)
|
||||
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
|
||||
augeas.Augeas.NO_MODL_AUTOLOAD)
|
||||
aug.set('/augeas/load/Shellvars/lens', 'Shellvars.lns')
|
||||
aug.set('/augeas/load/Shellvars/incl[last() + 1]', DEFAULT_FILE)
|
||||
aug.load()
|
||||
@ -301,7 +301,7 @@ def subcommand_get_users(_):
|
||||
def subcommand_setup(_):
|
||||
"""Configure samba, use custom samba config file."""
|
||||
from plinth import action_utils
|
||||
with open(CONF_PATH, 'w') as file_handle:
|
||||
with open(CONF_PATH, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(CONF)
|
||||
_use_config_file(CONF_PATH)
|
||||
os.makedirs('/var/lib/freedombox', exist_ok=True)
|
||||
@ -313,7 +313,7 @@ def subcommand_setup(_):
|
||||
def subcommand_dump_shares(_):
|
||||
"""Dump registy share configuration."""
|
||||
os.makedirs(os.path.dirname(SHARES_CONF_BACKUP_FILE), exist_ok=True)
|
||||
with open(SHARES_CONF_BACKUP_FILE, 'w') as backup_file:
|
||||
with open(SHARES_CONF_BACKUP_FILE, 'w', encoding='utf-8') as backup_file:
|
||||
command = ['net', 'conf', 'list']
|
||||
subprocess.run(command, stdout=backup_file, check=True)
|
||||
|
||||
|
||||
@ -125,7 +125,7 @@ def read_settings():
|
||||
|
||||
def write_settings(settings):
|
||||
"""Write settings from dictionary to YAML config file."""
|
||||
with open(SETTINGS_FILE, 'w') as settings_file:
|
||||
with open(SETTINGS_FILE, 'w', encoding='utf-8') as settings_file:
|
||||
yaml.dump(settings, settings_file)
|
||||
|
||||
|
||||
@ -171,7 +171,7 @@ def subcommand_setup(_):
|
||||
|
||||
def subcommand_enable_public_access(_):
|
||||
"""Enable public access to the SearX application."""
|
||||
open(PUBLIC_ACCESS_SETTING_FILE, 'w').close()
|
||||
open(PUBLIC_ACCESS_SETTING_FILE, 'w', encoding='utf-8').close()
|
||||
|
||||
|
||||
def subcommand_disable_public_access(_):
|
||||
|
||||
@ -34,16 +34,16 @@ def subcommand_enable_restricted_access(_):
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
with open(ACCESS_CONF_FILE, 'w') as conffile:
|
||||
with open(ACCESS_CONF_FILE, 'w', encoding='utf-8') as conffile:
|
||||
conffile.write(ACCESS_CONF_SNIPPET + '\n')
|
||||
|
||||
|
||||
def subcommand_disable_restricted_access(_):
|
||||
"""Don't restrict console login to users in admin or sudo group."""
|
||||
with open(ACCESS_CONF_FILE_OLD, 'r') as conffile:
|
||||
with open(ACCESS_CONF_FILE_OLD, 'r', encoding='utf-8') as conffile:
|
||||
lines = conffile.readlines()
|
||||
|
||||
with open(ACCESS_CONF_FILE_OLD, 'w') as conffile:
|
||||
with open(ACCESS_CONF_FILE_OLD, 'w', encoding='utf-8') as conffile:
|
||||
for line in lines:
|
||||
if line.strip() not in ACCESS_CONF_SNIPPETS:
|
||||
conffile.write(line)
|
||||
|
||||
@ -83,7 +83,7 @@ def subcommand_setup(_):
|
||||
def subcommand_get_config(_):
|
||||
"""Read and print Shadowsocks configuration."""
|
||||
try:
|
||||
print(open(SHADOWSOCKS_CONFIG_SYMLINK, 'r').read())
|
||||
print(open(SHADOWSOCKS_CONFIG_SYMLINK, 'r', encoding='utf-8').read())
|
||||
except Exception:
|
||||
sys.exit(1)
|
||||
|
||||
@ -91,7 +91,8 @@ def subcommand_get_config(_):
|
||||
def _merge_config(config):
|
||||
"""Write merged configuration into file."""
|
||||
try:
|
||||
current_config = open(SHADOWSOCKS_CONFIG_SYMLINK, 'r').read()
|
||||
current_config = open(SHADOWSOCKS_CONFIG_SYMLINK, 'r',
|
||||
encoding='utf-8').read()
|
||||
current_config = json.loads(current_config)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
current_config = {}
|
||||
@ -99,7 +100,7 @@ def _merge_config(config):
|
||||
new_config = current_config
|
||||
new_config.update(config)
|
||||
new_config = json.dumps(new_config, indent=4, sort_keys=True)
|
||||
open(SHADOWSOCKS_CONFIG_SYMLINK, 'w').write(new_config)
|
||||
open(SHADOWSOCKS_CONFIG_SYMLINK, 'w', encoding='utf-8').write(new_config)
|
||||
|
||||
|
||||
def subcommand_merge_config(_):
|
||||
|
||||
@ -107,7 +107,7 @@ def subcommand_get_keys(arguments):
|
||||
|
||||
path = os.path.join(get_user_homedir(user), '.ssh', 'authorized_keys')
|
||||
try:
|
||||
with open(path, 'r') as file_handle:
|
||||
with open(path, 'r', encoding='utf-8') as file_handle:
|
||||
print(file_handle.read())
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
@ -130,7 +130,7 @@ def subcommand_set_keys(arguments):
|
||||
os.makedirs(ssh_folder)
|
||||
shutil.chown(ssh_folder, user, 'users')
|
||||
|
||||
with open(key_file_path, 'w') as file_handle:
|
||||
with open(key_file_path, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(arguments.keys)
|
||||
|
||||
shutil.chown(key_file_path, user, 'users')
|
||||
|
||||
14
actions/tor
14
actions/tor
@ -83,9 +83,11 @@ def _first_time_setup():
|
||||
|
||||
# Remove line starting with +SocksPort, since our augeas lens
|
||||
# doesn't handle it correctly.
|
||||
with open('/etc/tor/instances/plinth/torrc', 'r') as torrc:
|
||||
with open('/etc/tor/instances/plinth/torrc', 'r',
|
||||
encoding='utf-8') as torrc:
|
||||
torrc_lines = torrc.readlines()
|
||||
with open('/etc/tor/instances/plinth/torrc', 'w') as torrc:
|
||||
with open('/etc/tor/instances/plinth/torrc', 'w',
|
||||
encoding='utf-8') as torrc:
|
||||
for line in torrc_lines:
|
||||
if not line.startswith('+'):
|
||||
torrc.write(line)
|
||||
@ -261,7 +263,7 @@ def _get_ports():
|
||||
pass
|
||||
|
||||
try:
|
||||
with open(TOR_STATE_FILE, 'r') as state_file:
|
||||
with open(TOR_STATE_FILE, 'r', encoding='utf-8') as state_file:
|
||||
for line in state_file:
|
||||
matches = re.match(
|
||||
r'^\s*TransportProxy\s+(\S*)\s+\S+:(\d+)\s*$', line)
|
||||
@ -315,7 +317,8 @@ def _get_hidden_service(aug=None):
|
||||
hs_status = 'Not Configured'
|
||||
else:
|
||||
try:
|
||||
with open(os.path.join(hs_dir, 'hostname'), 'r') as conf_file:
|
||||
with open(os.path.join(hs_dir, 'hostname'), 'r',
|
||||
encoding='utf-8') as conf_file:
|
||||
hs_hostname = conf_file.read().strip()
|
||||
hs_enabled = True
|
||||
except Exception:
|
||||
@ -482,7 +485,8 @@ def _update_port(name, number):
|
||||
</service>
|
||||
"""
|
||||
try:
|
||||
with open(SERVICE_FILE.format(name), 'w') as service_file:
|
||||
with open(SERVICE_FILE.format(name), 'w',
|
||||
encoding='utf-8') as service_file:
|
||||
service_file.writelines(lines.format(name, number))
|
||||
except FileNotFoundError:
|
||||
return
|
||||
|
||||
@ -126,7 +126,7 @@ def subcommand_enable_api_access(_):
|
||||
def subcommand_dump_database(_):
|
||||
"""Dump database to file."""
|
||||
os.makedirs(os.path.dirname(DB_BACKUP_FILE), exist_ok=True)
|
||||
with open(DB_BACKUP_FILE, 'w') as db_backup_file:
|
||||
with open(DB_BACKUP_FILE, 'w', encoding='utf-8') as db_backup_file:
|
||||
_run_as_postgres(['pg_dump', 'ttrss'], stdout=db_backup_file)
|
||||
|
||||
|
||||
@ -134,7 +134,7 @@ def subcommand_restore_database(_):
|
||||
"""Restore database from file."""
|
||||
_run_as_postgres(['dropdb', 'ttrss'])
|
||||
_run_as_postgres(['createdb', 'ttrss'])
|
||||
with open(DB_BACKUP_FILE, 'r') as db_restore_file:
|
||||
with open(DB_BACKUP_FILE, 'r', encoding='utf-8') as db_restore_file:
|
||||
_run_as_postgres(['psql', '--dbname', 'ttrss'], stdin=db_restore_file)
|
||||
|
||||
|
||||
|
||||
@ -197,14 +197,14 @@ def subcommand_check_auto(_):
|
||||
|
||||
def subcommand_enable_auto(_):
|
||||
"""Enable automatic upgrades"""
|
||||
with open(AUTO_CONF_FILE, 'w') as conffile:
|
||||
with open(AUTO_CONF_FILE, 'w', encoding='utf-8') as conffile:
|
||||
conffile.write('APT::Periodic::Update-Package-Lists "1";\n')
|
||||
conffile.write('APT::Periodic::Unattended-Upgrade "1";\n')
|
||||
|
||||
|
||||
def subcommand_disable_auto(_):
|
||||
"""Disable automatic upgrades"""
|
||||
with open(AUTO_CONF_FILE, 'w') as conffile:
|
||||
with open(AUTO_CONF_FILE, 'w', encoding='utf-8') as conffile:
|
||||
conffile.write('APT::Periodic::Update-Package-Lists "0";\n')
|
||||
conffile.write('APT::Periodic::Unattended-Upgrade "0";\n')
|
||||
|
||||
@ -213,14 +213,14 @@ def subcommand_get_log(_):
|
||||
"""Print the automatic upgrades log."""
|
||||
try:
|
||||
print('==> ' + os.path.basename(LOG_FILE))
|
||||
with open(LOG_FILE, 'r') as file_handle:
|
||||
with open(LOG_FILE, 'r', encoding='utf-8') as file_handle:
|
||||
print(file_handle.read())
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
try:
|
||||
print('==> ' + os.path.basename(DPKG_LOG_FILE))
|
||||
with open(DPKG_LOG_FILE, 'r') as file_handle:
|
||||
with open(DPKG_LOG_FILE, 'r', encoding='utf-8') as file_handle:
|
||||
print(file_handle.read())
|
||||
except IOError:
|
||||
pass
|
||||
@ -261,7 +261,7 @@ deb {protocol}://deb.debian.org/debian {dist}-backports main
|
||||
deb-src {protocol}://deb.debian.org/debian {dist}-backports main
|
||||
'''
|
||||
sources = sources.format(protocol=protocol, dist=dist)
|
||||
with open(sources_list, 'w') as file_handle:
|
||||
with open(sources_list, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(sources)
|
||||
|
||||
|
||||
@ -276,7 +276,8 @@ def _check_and_backports_sources(develop=False):
|
||||
return
|
||||
|
||||
try:
|
||||
with open('/etc/dpkg/origins/default', 'r') as default_origin:
|
||||
with open('/etc/dpkg/origins/default', 'r',
|
||||
encoding='utf-8') as default_origin:
|
||||
matches = [
|
||||
re.match(r'Vendor:\s+Debian', line, flags=re.IGNORECASE)
|
||||
for line in default_origin.readlines()
|
||||
@ -326,9 +327,11 @@ def _add_apt_preferences():
|
||||
'for backports.')
|
||||
else:
|
||||
print(f'Setting apt preferences for {dist}-backports.')
|
||||
with open(base_path / '50freedombox4.pref', 'w') as file_handle:
|
||||
with open(base_path / '50freedombox4.pref', 'w',
|
||||
encoding='utf-8') as file_handle:
|
||||
file_handle.write(APT_PREFERENCES_FREEDOMBOX.format(dist))
|
||||
with open(base_path / '51freedombox-apps.pref', 'w') as file_handle:
|
||||
with open(base_path / '51freedombox-apps.pref', 'w',
|
||||
encoding='utf-8') as file_handle:
|
||||
file_handle.write(APT_PREFERENCES_APPS)
|
||||
|
||||
|
||||
@ -394,10 +397,10 @@ def _check_dist_upgrade(test_upgrade=False) -> Tuple[bool, str]:
|
||||
return (False, 'not-enough-free-space')
|
||||
|
||||
logging.info('Upgrading from %s to %s...', dist, codename)
|
||||
with open(SOURCES_LIST, 'r') as sources_list:
|
||||
with open(SOURCES_LIST, 'r', encoding='utf-8') as sources_list:
|
||||
lines = sources_list.readlines()
|
||||
|
||||
with open(SOURCES_LIST, 'w') as sources_list:
|
||||
with open(SOURCES_LIST, 'w', encoding='utf-8') as sources_list:
|
||||
for line in lines:
|
||||
# E.g. replace 'buster' with 'bullseye'.
|
||||
new_line = line.replace(dist, codename)
|
||||
@ -571,7 +574,8 @@ def subcommand_activate_backports(arguments):
|
||||
|
||||
def _start_dist_upgrade_service():
|
||||
"""Create dist upgrade service and start it."""
|
||||
with open(DIST_UPGRADE_SERVICE_PATH, 'w') as service_file:
|
||||
with open(DIST_UPGRADE_SERVICE_PATH, 'w',
|
||||
encoding='utf-8') as service_file:
|
||||
service_file.write(DIST_UPGRADE_SERVICE)
|
||||
|
||||
service_daemon_reload()
|
||||
|
||||
@ -87,7 +87,7 @@ define('WP_CONTENT_DIR', '/var/lib/wordpress/wp-content');
|
||||
|
||||
define('DISABLE_WP_CRON', true);
|
||||
'''
|
||||
_config_file_path.write_text(config_contents)
|
||||
_config_file_path.write_text(config_contents, encoding='utf-8')
|
||||
|
||||
db_contents = f'''<?php
|
||||
# Created by FreedomBox
|
||||
@ -98,7 +98,7 @@ $dbserver='{db_host}';
|
||||
'''
|
||||
old_umask = os.umask(0o037)
|
||||
try:
|
||||
_db_file_path.write_text(db_contents)
|
||||
_db_file_path.write_text(db_contents, encoding='utf-8')
|
||||
finally:
|
||||
os.umask(old_umask)
|
||||
|
||||
@ -158,7 +158,7 @@ def subcommand_set_public(arguments):
|
||||
def subcommand_dump_database(_):
|
||||
"""Dump database to file."""
|
||||
_db_backup_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
with _db_backup_file.open('w') as file_handle:
|
||||
with _db_backup_file.open('w', encoding='utf-8') as file_handle:
|
||||
subprocess.run([
|
||||
'mysqldump', '--add-drop-database', '--add-drop-table',
|
||||
'--add-drop-trigger', '--user', 'root', '--databases', DB_NAME
|
||||
@ -167,7 +167,7 @@ def subcommand_dump_database(_):
|
||||
|
||||
def subcommand_restore_database(_):
|
||||
"""Restore database from file."""
|
||||
with _db_backup_file.open('r') as file_handle:
|
||||
with _db_backup_file.open('r', encoding='utf-8') as file_handle:
|
||||
subprocess.run(['mysql', '--user', 'root'], stdin=file_handle,
|
||||
check=True)
|
||||
|
||||
|
||||
@ -82,7 +82,9 @@ def subcommand_setup(_):
|
||||
def _get_db_name():
|
||||
"""Return the name of the database configured by dbconfig."""
|
||||
config = configparser.ConfigParser()
|
||||
config.read_file(open('/etc/zoph.ini', 'r'))
|
||||
with open('/etc/zoph.ini', 'r', encoding='utf-8') as file_handle:
|
||||
config.read_file(file_handle)
|
||||
|
||||
return config['zoph']['db_name'].strip('"')
|
||||
|
||||
|
||||
@ -120,7 +122,7 @@ def subcommand_dump_database(_):
|
||||
"""Dump database to file."""
|
||||
db_name = _get_db_name()
|
||||
os.makedirs(os.path.dirname(DB_BACKUP_FILE), exist_ok=True)
|
||||
with open(DB_BACKUP_FILE, 'w') as db_backup_file:
|
||||
with open(DB_BACKUP_FILE, 'w', encoding='utf-8') as db_backup_file:
|
||||
subprocess.run(['mysqldump', db_name], stdout=db_backup_file,
|
||||
check=True)
|
||||
|
||||
@ -130,7 +132,7 @@ def subcommand_restore_database(_):
|
||||
db_name = _get_db_name()
|
||||
subprocess.run(['mysqladmin', '--force', 'drop', db_name], check=False)
|
||||
subprocess.run(['mysqladmin', 'create', db_name], check=True)
|
||||
with open(DB_BACKUP_FILE, 'r') as db_restore_file:
|
||||
with open(DB_BACKUP_FILE, 'r', encoding='utf-8') as db_restore_file:
|
||||
subprocess.run(['mysql', db_name], stdin=db_restore_file, check=True)
|
||||
|
||||
|
||||
|
||||
@ -460,7 +460,7 @@ def _extract_image(compressed_file):
|
||||
|
||||
logger.info('Decompressing file %s', compressed_file)
|
||||
partial_file = compressed_file.with_suffix('.partial')
|
||||
with partial_file.open('w') as file_handle:
|
||||
with partial_file.open('w', encoding='utf-8') as file_handle:
|
||||
subprocess.run([
|
||||
'xz', '--verbose', '--decompress', '--keep', '--to-stdout',
|
||||
str(compressed_file)
|
||||
|
||||
@ -185,7 +185,7 @@ def get_custom_shortcuts():
|
||||
continue
|
||||
|
||||
logger.info('Loading custom shortcuts from %s', file_path)
|
||||
with file_path.open() as file_handle:
|
||||
with file_path.open(encoding='utf-8') as file_handle:
|
||||
shortcuts['shortcuts'] += json.load(file_handle)['shortcuts']
|
||||
except Exception as exception:
|
||||
logger.warning('Error loading shortcuts from %s: %s', file_path,
|
||||
|
||||
@ -95,7 +95,7 @@ def _backup_handler(packet, encryption_passphrase=None):
|
||||
'backup': component.manifest
|
||||
} for component in packet.components]
|
||||
}
|
||||
with open(manifest_path, 'w') as manifest_file:
|
||||
with open(manifest_path, 'w', encoding='utf-8') as manifest_file:
|
||||
json.dump(manifests, manifest_file)
|
||||
|
||||
paths = packet.directories + packet.files
|
||||
|
||||
@ -110,7 +110,8 @@ def test_add_repository_when_directory_exists_and_not_empty(
|
||||
temp_user, temp_home, password):
|
||||
remote_path = os.path.join(temp_home, 'non_empty_dir')
|
||||
os.makedirs(remote_path)
|
||||
open(os.path.join(remote_path, 'somefile.txt'), 'w').close()
|
||||
open(os.path.join(remote_path, 'somefile.txt'), 'w',
|
||||
encoding='utf-8').close()
|
||||
data = {
|
||||
'repository': f'{temp_user}@localhost:{remote_path}',
|
||||
'ssh_password': password,
|
||||
|
||||
@ -381,7 +381,7 @@ class VerifySshHostkeyView(SuccessMessageMixin, FormView):
|
||||
known_hosts_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
known_hosts_path.touch()
|
||||
|
||||
with known_hosts_path.open('a') as known_hosts_file:
|
||||
with known_hosts_path.open('a', encoding='utf-8') as known_hosts_file:
|
||||
known_hosts_file.write(ssh_public_key + '\n')
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
|
||||
@ -117,7 +117,7 @@ def force_upgrade(helper, _packages):
|
||||
|
||||
def get_config():
|
||||
"""Get current configuration"""
|
||||
data = [line.strip() for line in open(CONFIG_FILE, 'r')]
|
||||
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
|
||||
|
||||
forwarders = ''
|
||||
dnssec_enabled = False
|
||||
@ -141,8 +141,8 @@ def get_config():
|
||||
def set_forwarders(forwarders):
|
||||
"""Set DNS forwarders."""
|
||||
flag = 0
|
||||
data = [line.strip() for line in open(CONFIG_FILE, 'r')]
|
||||
conf_file = open(CONFIG_FILE, 'w')
|
||||
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
|
||||
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
|
||||
for line in data:
|
||||
if re.match(r'^\s*forwarders\s+{', line):
|
||||
conf_file.write(line + '\n')
|
||||
@ -160,10 +160,10 @@ def set_forwarders(forwarders):
|
||||
|
||||
def set_dnssec(choice):
|
||||
"""Enable or disable DNSSEC."""
|
||||
data = [line.strip() for line in open(CONFIG_FILE, 'r')]
|
||||
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
|
||||
|
||||
if choice == 'enable':
|
||||
conf_file = open(CONFIG_FILE, 'w')
|
||||
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
|
||||
for line in data:
|
||||
if re.match(r'//\s*dnssec-enable\s+yes;', line):
|
||||
line = line.lstrip('/')
|
||||
@ -171,7 +171,7 @@ def set_dnssec(choice):
|
||||
conf_file.close()
|
||||
|
||||
if choice == 'disable':
|
||||
conf_file = open(CONFIG_FILE, 'w')
|
||||
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
|
||||
for line in data:
|
||||
if re.match(r'^\s*dnssec-enable\s+yes;', line):
|
||||
line = '//' + line
|
||||
|
||||
@ -4,6 +4,7 @@ FreedomBox app for configuring date and time.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
from django.contrib import messages
|
||||
from django.utils.translation import gettext as _
|
||||
@ -17,6 +18,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DateTimeAppView(AppView):
|
||||
"""Serve configuration page."""
|
||||
form_class = DateTimeForm
|
||||
app_id = 'datetime'
|
||||
|
||||
@ -25,9 +27,11 @@ class DateTimeAppView(AppView):
|
||||
status['time_zone'] = self.get_current_time_zone()
|
||||
return status
|
||||
|
||||
def get_current_time_zone(self):
|
||||
@staticmethod
|
||||
def get_current_time_zone():
|
||||
"""Get current time zone."""
|
||||
time_zone = open('/etc/timezone').read().rstrip()
|
||||
path = pathlib.Path('/etc/timezone')
|
||||
time_zone = path.read_text(encoding='utf-8').rstip()
|
||||
return time_zone or 'none'
|
||||
|
||||
def form_valid(self, form):
|
||||
|
||||
@ -16,6 +16,7 @@ _JSON_FORMAT = {'indent': 4, 'sort_keys': True, 'ensure_ascii': False}
|
||||
|
||||
class Config:
|
||||
"""Read or edit a Deluge configuration file."""
|
||||
|
||||
def __init__(self, file_name):
|
||||
"""Initialize the configuration object."""
|
||||
self.file_name = file_name
|
||||
@ -26,7 +27,7 @@ class Config:
|
||||
|
||||
def load(self):
|
||||
"""Parse the configuration file into memory."""
|
||||
text = self.file.read_text()
|
||||
text = self.file.read_text(encoding='utf-8')
|
||||
matches = re.match(r'^({[^}]*})(.*)$', text, re.DOTALL)
|
||||
if not matches:
|
||||
raise Exception('Unexpected file format.')
|
||||
|
||||
@ -61,7 +61,8 @@ def action_set_domains(primary_domain, all_domains):
|
||||
'mydestination': my_destination
|
||||
}
|
||||
postfix.set_config(conf)
|
||||
pathlib.Path('/etc/mailname').write_text(primary_domain + '\n')
|
||||
pathlib.Path('/etc/mailname').write_text(primary_domain + '\n',
|
||||
encoding='utf-8')
|
||||
tls.set_postfix_config(primary_domain, all_domains)
|
||||
tls.set_dovecot_config(primary_domain, all_domains)
|
||||
|
||||
|
||||
@ -80,4 +80,4 @@ local_name {domain} {{
|
||||
}}
|
||||
'''
|
||||
cert_config = pathlib.Path('/etc/dovecot/conf.d/91-freedombox-tls.conf')
|
||||
cert_config.write_text(content)
|
||||
cert_config.write_text(content, encoding='utf-8')
|
||||
|
||||
@ -23,7 +23,7 @@ class FirstbootWizardSecretForm(forms.Form):
|
||||
generated during installation.
|
||||
"""
|
||||
secret_file_path = first_boot.get_secret_file_path()
|
||||
with open(secret_file_path) as secret_file:
|
||||
with open(secret_file_path, encoding='utf-8') as secret_file:
|
||||
if secret != secret_file.read().strip():
|
||||
self.add_error('secret', 'Invalid secret')
|
||||
|
||||
|
||||
@ -215,7 +215,7 @@ def get_repo_list():
|
||||
|
||||
progress_file = os.path.join(GIT_REPO_PATH, repo, 'clone_progress')
|
||||
if os.path.exists(progress_file):
|
||||
with open(progress_file) as file_handle:
|
||||
with open(progress_file, encoding='utf-8') as file_handle:
|
||||
clone_progress = file_handle.read()
|
||||
repo_info['clone_progress'] = clone_progress
|
||||
|
||||
|
||||
@ -185,7 +185,7 @@ def get_configured_domain_name():
|
||||
if not is_setup():
|
||||
return None
|
||||
|
||||
with open(SERVER_NAME_PATH) as config_file:
|
||||
with open(SERVER_NAME_PATH, encoding='utf-8') as config_file:
|
||||
config, _, _ = load_yaml_guess_indent(config_file)
|
||||
|
||||
return config['server_name']
|
||||
@ -196,7 +196,7 @@ def get_turn_configuration() -> (List[str], str, bool):
|
||||
for file_path, managed in ((OVERRIDDEN_TURN_CONF_PATH, False),
|
||||
(TURN_CONF_PATH, True)):
|
||||
if is_non_empty_file(file_path):
|
||||
with open(file_path) as config_file:
|
||||
with open(file_path, encoding='utf-8') as config_file:
|
||||
config, _, _ = load_yaml_guess_indent(config_file)
|
||||
return (TurnConfiguration(None, config['turn_uris'],
|
||||
config['turn_shared_secret']),
|
||||
|
||||
@ -128,7 +128,7 @@ def is_private_mode_enabled():
|
||||
|
||||
def _get_config_value_in_file(setting_name, config_file):
|
||||
"""Return the value of a setting from a config file."""
|
||||
with open(config_file, 'r') as config:
|
||||
with open(config_file, 'r', encoding='utf-8') as config:
|
||||
for line in config:
|
||||
if line.startswith(setting_name):
|
||||
return re.findall(r'["\'][^"\']*["\']', line)[0].strip('"\'')
|
||||
|
||||
@ -122,7 +122,8 @@ def is_setup():
|
||||
def is_using_ecc():
|
||||
"""Return whether the service is using ECC."""
|
||||
if os.path.exists(SERVER_CONFIGURATION_FILE):
|
||||
with open(SERVER_CONFIGURATION_FILE, 'r') as file_handle:
|
||||
with open(SERVER_CONFIGURATION_FILE, 'r',
|
||||
encoding='utf-8') as file_handle:
|
||||
for line in file_handle:
|
||||
if line.strip() == 'dh none':
|
||||
return True
|
||||
|
||||
@ -34,7 +34,7 @@ def fixture_conf_file(tmp_path):
|
||||
def test_identify_rsa_configuration(conf_file):
|
||||
"""Identify RSA configuration based on configuration file."""
|
||||
with patch('plinth.modules.openvpn.SERVER_CONFIGURATION_FILE', conf_file):
|
||||
with open(conf_file, 'w') as file_handle:
|
||||
with open(conf_file, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write('dh /etc/openvpn/freedombox-keys/pki/dh.pem')
|
||||
assert not openvpn.is_using_ecc()
|
||||
|
||||
@ -42,7 +42,7 @@ def test_identify_rsa_configuration(conf_file):
|
||||
def test_identify_ecc_configuration(conf_file):
|
||||
"""Identify ECC configuration based on configuration file."""
|
||||
with patch('plinth.modules.openvpn.SERVER_CONFIGURATION_FILE', conf_file):
|
||||
with open(conf_file, 'w') as file_handle:
|
||||
with open(conf_file, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write('dh none')
|
||||
assert openvpn.is_using_ecc()
|
||||
|
||||
|
||||
@ -118,7 +118,8 @@ def get_domain():
|
||||
"""Read TLS domain from config file select first available if none."""
|
||||
domain = None
|
||||
try:
|
||||
with open('/var/lib/quassel/domain-freedombox') as file_handle:
|
||||
with open('/var/lib/quassel/domain-freedombox',
|
||||
encoding='utf-8') as file_handle:
|
||||
domain = file_handle.read().strip()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
@ -80,13 +80,13 @@ def enable_fail2ban():
|
||||
|
||||
def get_restricted_access_enabled():
|
||||
"""Return whether restricted access is enabled"""
|
||||
with open(ACCESS_CONF_FILE_OLD, 'r') as conffile:
|
||||
with open(ACCESS_CONF_FILE_OLD, 'r', encoding='utf-8') as conffile:
|
||||
if any(line.strip() in ACCESS_CONF_SNIPPETS
|
||||
for line in conffile.readlines()):
|
||||
return True
|
||||
|
||||
try:
|
||||
with open(ACCESS_CONF_FILE, 'r') as conffile:
|
||||
with open(ACCESS_CONF_FILE, 'r', encoding='utf-8') as conffile:
|
||||
return any(line.strip() in ACCESS_CONF_SNIPPETS
|
||||
for line in conffile.readlines())
|
||||
except FileNotFoundError:
|
||||
|
||||
@ -97,7 +97,7 @@ def is_newer_version_available():
|
||||
def get_os_release():
|
||||
"""Returns the Debian release number and name."""
|
||||
output = 'Error: Cannot read PRETTY_NAME in /etc/os-release.'
|
||||
with open('/etc/os-release', 'r') as release_file:
|
||||
with open('/etc/os-release', 'r', encoding='utf-8') as release_file:
|
||||
for line in release_file:
|
||||
if 'PRETTY_NAME=' in line:
|
||||
line = line.replace('"', '').strip()
|
||||
|
||||
@ -8,8 +8,8 @@ from unittest.mock import MagicMock, Mock
|
||||
|
||||
import pytest
|
||||
import ruamel.yaml
|
||||
from ruamel.yaml.compat import StringIO
|
||||
from django.test.client import RequestFactory
|
||||
from ruamel.yaml.compat import StringIO
|
||||
|
||||
from plinth.utils import YAMLFile, is_user_admin, is_valid_user_name
|
||||
|
||||
@ -105,7 +105,7 @@ class TestYAMLFileUtil:
|
||||
for key in conf:
|
||||
file_conf[key] = conf[key]
|
||||
|
||||
with open(test_file.name, 'r') as retrieved_conf:
|
||||
with open(test_file.name, 'r', encoding='utf-8') as retrieved_conf:
|
||||
buffer = StringIO()
|
||||
self.yaml.dump(conf, buffer)
|
||||
assert retrieved_conf.read() == buffer.getvalue()
|
||||
@ -116,13 +116,13 @@ class TestYAMLFileUtil:
|
||||
"""
|
||||
test_file = tempfile.NamedTemporaryFile()
|
||||
|
||||
with open(test_file.name, 'w') as conf_file:
|
||||
with open(test_file.name, 'w', encoding='utf-8') as conf_file:
|
||||
self.yaml.dump({'property1': self.kv_pair}, conf_file)
|
||||
|
||||
with YAMLFile(test_file.name) as file_conf:
|
||||
file_conf['property2'] = self.kv_pair
|
||||
|
||||
with open(test_file.name, 'r') as retrieved_conf:
|
||||
with open(test_file.name, 'r', encoding='utf-8') as retrieved_conf:
|
||||
file_conf = self.yaml.load(retrieved_conf)
|
||||
assert file_conf == {
|
||||
'property1': self.kv_pair,
|
||||
@ -138,4 +138,4 @@ class TestYAMLFileUtil:
|
||||
yaml_file['property1'] = 'value1'
|
||||
raise ValueError('Test')
|
||||
|
||||
assert open(test_file.name, 'r').read() == ''
|
||||
assert open(test_file.name, 'r', encoding='utf-8').read() == ''
|
||||
|
||||
@ -106,7 +106,7 @@ class YAMLFile(object):
|
||||
self.yaml.preserve_quotes = True
|
||||
|
||||
def __enter__(self):
|
||||
with open(self.yaml_file, 'r') as intro_conf:
|
||||
with open(self.yaml_file, 'r', encoding='utf-8') as intro_conf:
|
||||
if not self.is_file_empty():
|
||||
self.conf = self.yaml.load(intro_conf)
|
||||
else:
|
||||
@ -116,7 +116,7 @@ class YAMLFile(object):
|
||||
|
||||
def __exit__(self, type_, value, traceback):
|
||||
if not traceback:
|
||||
with open(self.yaml_file, 'w') as intro_conf:
|
||||
with open(self.yaml_file, 'w', encoding='utf-8') as intro_conf:
|
||||
self.yaml.dump(self.conf, intro_conf)
|
||||
|
||||
def is_file_empty(self):
|
||||
@ -140,7 +140,8 @@ def generate_password(size=32):
|
||||
def grep(pattern, file_name):
|
||||
"""Return lines of a file matching a pattern."""
|
||||
return [
|
||||
line.rstrip() for line in open(file_name) if re.search(pattern, line)
|
||||
line.rstrip() for line in open(file_name, encoding='utf-8')
|
||||
if re.search(pattern, line)
|
||||
]
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user