diff --git a/plinth/operation.py b/plinth/operation.py new file mode 100644 index 000000000..ddb5f2522 --- /dev/null +++ b/plinth/operation.py @@ -0,0 +1,244 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Utilities to run operations and show their progress or failures.""" + +import enum +import logging +import threading +from typing import Callable, Optional + +from . import app as app_module + +logger = logging.getLogger(__name__) + + +class Operation: + """Represent an ongoing or finished activity.""" + + class State(enum.Enum): + """Various states of an operation.""" + + WAITING: str = 'waiting' + RUNNING: str = 'running' + COMPLETED: str = 'completed' + + def __init__(self, app_id: str, name: str, target: Callable, + args: Optional[list] = None, kwargs: Optional[dict] = None, + show_message: bool = True, show_notification: bool = False, + thread_data: Optional[dict] = None, + on_complete: Callable = None): + """Initialize to no operation.""" + self.app_id = app_id + self.name = name + self.show_message = show_message + self.show_notification = show_notification + + self.target = target + self.args = args or [] + self.kwargs = kwargs or {} + self.on_complete = on_complete + + self.state = Operation.State.WAITING + self.return_value = None + self._message: Optional[str] = None + self.exception: Optional[Exception] = None + + # Operation specific data + self.thread_data: dict = thread_data or {} + + self.thread = threading.Thread(target=self._catch_thread_errors) + self.start_event = threading.Event() + setattr(self.thread, '_operation', self) + self._update_notification() + + def __str__(self): + """Return a string representation of the operation.""" + return f'Operation: {self.app_id}: {self.name}' + + def _catch_thread_errors(self): + """Collect exceptions when running in a thread.""" + self._update_notification() + try: + self.return_value = self.target(*self.args, **self.kwargs) + except Exception as exception: + logger.exception('Error: %s, %s', self, exception) + self.exception = exception + finally: + self.state = Operation.State.COMPLETED + self._update_notification() + # Notify + if self.on_complete: + self.on_complete(self) + + def run(self): + """Run a specified operation in a thread.""" + logger.info('%s: running', str(self)) + self.state = Operation.State.RUNNING + self.thread.start() + self.start_event.set() + + def join(self): + """Block the current thread until the operation is completed. + + Raise an exception if the thread encountered an exception. + """ + self.start_event.wait() + self.thread.join() + if self.exception: + raise self.exception + + return self.return_value + + @staticmethod + def get_operation(): + """Return the operation associated with this thread.""" + thread = threading.current_thread() + return thread._operation + + def on_update(self, message: Optional[str] = None, + exception: Optional[Exception] = None): + """Call from within the thread to update the progress of operation.""" + if message: + self._message = message + + if exception: + self.exception = exception + + self._update_notification() + + @property + def message(self): + """Return a message about status of the operation.""" + from django.utils.translation import gettext_noop + if self._message: # Progress has been set by the operation itself + return self._message + + if self.exception: # Operation resulted in a error. + return gettext_noop('Error: {name}: {exception_message}') + + if self.state == Operation.State.WAITING: + return gettext_noop('Waiting to start: {name}') + + if self.state == Operation.State.RUNNING: + return '{name}' # No translation needed + + if self.state == Operation.State.COMPLETED: + return gettext_noop('Finished: {name}') + + @property + def translated_message(self): + """Return a message about status of operation after translating. + + Must be called from a web request (UI) thread with user language set so + that localization is done properly. + """ + from django.utils.translation import gettext + message = gettext(self.message) + message = message.format(name=self.name, + exception_message=str(self.exception)) + if self.app_id: + message = message.format( + app_name=app_module.App.get(self.app_id).info.name) + + return message + + def _update_notification(self): + """Show an updated notification if needed.""" + if not self.show_notification: + return + + from plinth.notification import Notification + severity = 'info' if not self.exception else 'error' + app = app_module.App.get(self.app_id) + data = { + 'app_name': str(app.info.name), + 'app_icon': app.info.icon, + 'app_icon_filename': app.info.icon_filename, + 'state': self.state.value, + 'exception': str(self.exception) if self.exception else None, + 'name': 'translate:' + str(self.name), + } + Notification.update_or_create( + id=self.app_id + '-operation', app_id=self.app_id, + severity=severity, title=app.info.name, message=self.message, + body_template='operation-notification.html', data=data, + group='admin', dismissed=False) + + +class OperationsManager: + """Global handler for all operations and their results.""" + + def __init__(self): + """Initialize the object.""" + self._operations: list[Operation] = [] + self._current_operation: Optional[Operation] = None + + # Assume that operations manager will be called from various threads + # including the callback called from the threads it creates. Ensure + # that properties don't get corrupted due to race conditions when + # called from different threads by locking all code that updates them. + # It is re-entrant lock, meaning it can be re-obtained without blocking + # when done from the same thread which holds the lock. + self._lock = threading.RLock() + + def new(self, *args, **kwargs): + """Create a new operation instance and add to global list.""" + with self._lock: + operation = Operation(*args, **kwargs, + on_complete=self._on_operation_complete) + self._operations.append(operation) + logger.info('%s: added', operation) + self._schedule_next() + return operation + + def _on_operation_complete(self, operation): + """Trigger next operation. Called from within previous thread.""" + logger.debug('%s: on_complete called', operation) + with self._lock: + self._current_operation = None + if not operation.show_message: + # No need to keep it lingering for later collection + self._operations.remove(operation) + + self._schedule_next() + + def _schedule_next(self): + """Schedule the next available operation.""" + with self._lock: + if self._current_operation: + return + + for operation in self._operations: + if operation.state == Operation.State.WAITING: + logger.debug('%s: scheduling', operation) + self._current_operation = operation + operation.run() + break + + def filter(self, app_id): + """Return operations matching a pattern.""" + with self._lock: + return [ + operation for operation in self._operations + if operation.app_id == app_id + ] + + def collect_results(self, app_id): + """Return the finished operations for an app.""" + results: list[Operation] = [] + remaining: list[Operation] = [] + + with self._lock: + for operation in self._operations: + if (operation.app_id == app_id + and operation.state == Operation.State.COMPLETED): + results.append(operation) + else: + remaining.append(operation) + + if results: + self._operations = remaining + + return results + + +manager = OperationsManager() diff --git a/plinth/templates/operation-notification.html b/plinth/templates/operation-notification.html new file mode 100644 index 000000000..21de43cdd --- /dev/null +++ b/plinth/templates/operation-notification.html @@ -0,0 +1,26 @@ +{% comment %} +# SPDX-License-Identifier: AGPL-3.0-or-later +{% endcomment %} + +{% load i18n %} +{% load static %} + +

+ {% if data.state == "waiting" %} + + {% elif data.state == "running" %} + + {% elif data.state == "completed" %} + {% endif %} + + {{ message }} +

+ +{% if data.state == "completed" %} +

+ + {% trans "Dismiss" %} + +

+{% endif %} diff --git a/plinth/tests/test_operation.py b/plinth/tests/test_operation.py new file mode 100644 index 000000000..2991bb346 --- /dev/null +++ b/plinth/tests/test_operation.py @@ -0,0 +1,336 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Test module for Operation and OperationsManager.""" + +import threading +import time +from unittest.mock import Mock, call, patch + +import pytest + +from plinth import app + +from .. import operation as operation_module +from ..notification import Notification +from ..operation import Operation, OperationsManager + + +class TestApp(app.App): + app_id = 'testapp' + + def __init__(self): + super().__init__() + + info = app.Info(self.app_id, 1, name='Test App') + self.add(info) + + +@patch('plinth.operation.Operation._update_notification') +def test_operation_default_initialization(update_notification): + """Test Operation initialization with default values.""" + target = Mock() + operation = Operation('testapp', 'op1', target) + assert operation.app_id == 'testapp' + assert operation.name == 'op1' + assert operation.show_message + assert not operation.show_notification + assert operation.target == target + assert operation.args == [] + assert operation.kwargs == {} + assert operation.on_complete is None + assert operation.state == Operation.State.WAITING + assert operation._message is None + assert operation.exception is None + assert operation.thread_data == {} + assert isinstance(operation.thread, threading.Thread) + assert operation.thread._operation == operation + assert update_notification.has_calls([call()]) + + +@patch('plinth.operation.Operation._update_notification') +def test_operation_initialization(update_notification): + """Test Operation initialization with explicit values.""" + on_complete = Mock() + operation = Operation('testapp', 'op1', Mock(), ['arg1'], + {'arg2': 'value2'}, False, True, + {'data1': 'datavalue1'}, on_complete) + assert not operation.show_message + assert operation.show_notification + assert operation.args == ['arg1'] + assert operation.kwargs == {'arg2': 'value2'} + assert operation.on_complete == on_complete + assert operation.state == Operation.State.WAITING + assert operation._message is None + assert operation.exception is None + assert operation.thread_data == {'data1': 'datavalue1'} + assert update_notification.has_calls([call()]) + + +def test_operation_str(): + """Test string representation of operation.""" + operation = Operation('testapp', 'op1', Mock()) + assert str(operation) == 'Operation: testapp: op1' + + +@patch('plinth.operation.Operation._update_notification') +def test_successful_operation(update_notification): + """Test running a operation that succeeds.""" + target = Mock() + target.return_value = 'test-return' + on_complete = Mock() + operation = Operation('testapp', 'op1', target, ['arg1'], + {'arg2': 'value2'}, on_complete=on_complete) + operation.run() + assert operation.join() == 'test-return' + target.assert_has_calls([call('arg1', arg2='value2')]) + assert operation.state == Operation.State.COMPLETED + assert operation.return_value == 'test-return' + on_complete.assert_has_calls([call(operation)]) + assert update_notification.has_calls([call(), call()]) + + +@patch('plinth.operation.Operation._update_notification') +def test_error_operation(update_notification): + """Test running an operation that fails.""" + target = Mock() + target.side_effect = RuntimeError('error1') + on_complete = Mock() + operation = Operation('testapp', 'op1', target, ['arg1'], + {'arg2': 'value2'}, on_complete=on_complete) + operation.run() + with pytest.raises(RuntimeError): + operation.join() + + target.assert_has_calls([call('arg1', arg2='value2')]) + assert operation.state == Operation.State.COMPLETED + assert operation.exception == target.side_effect + on_complete.assert_has_calls([call(operation)]) + assert update_notification.has_calls([call(), call()]) + + +@patch('plinth.operation.Operation._update_notification') +def test_join_before_start(update_notification): + """Test waiting until operation finishes..""" + event = threading.Event() + operation = Operation('testapp', 'op1', Mock) + success = [] + + def _wait(): + """Wait for operation to start.""" + event.set() + operation.join() + success.append(True) + + thread = threading.Thread(target=_wait) + thread.start() + event.wait() + time.sleep(0.1) # Ensure that thread is waiting before we start operation. + operation.run() + thread.join() + assert success + + +@patch('plinth.operation.Operation._update_notification') +def test_join_raises_exception(update_notification): + """Test that joining raises exception if thread does..""" + target = Mock() + target.side_effect = RuntimeError('error1') + on_complete = Mock() + operation = Operation('testapp', 'op1', target, ['arg1'], + {'arg2': 'value2'}, on_complete=on_complete) + operation.run() + with pytest.raises(RuntimeError): + operation.join() + + +def test_getting_operation_from_thread(): + """Test that operation object can be retread from within the thread.""" + + def target(): + operation = Operation.get_operation() + operation.thread_data['test_operation'] = operation + + operation = Operation('testapp', 'op1', target) + operation.run() + operation.join() + assert operation.thread_data['test_operation'] == operation + + +@patch('plinth.operation.Operation._update_notification') +def test_updating_operation(update_notification): + """Test that operation object can be updated from within the thread.""" + exception = RuntimeError('error1') + + def target(): + operation = Operation.get_operation() + operation.on_update('message1', exception) + + operation = Operation('testapp', 'op1', target) + operation.run() + with pytest.raises(RuntimeError): + operation.join() + + assert operation._message == 'message1' + assert operation.exception == exception + assert update_notification.has_calls([call(), call(), call()]) + + +@patch('plinth.app.App.get') +def test_message(app_get): + """Test getting the operation's message.""" + operation = Operation('testapp', 'op1', Mock()) + operation._message = 'message1' + operation.exception = RuntimeError('error1') + assert operation.message == 'message1' + assert operation.translated_message == 'message1' + + operation._message = None + assert operation.message == 'Error: {name}: {exception_message}' + assert operation.translated_message == 'Error: op1: error1' + + operation.exception = None + operation.state = Operation.State.WAITING + assert operation.message == 'Waiting to start: {name}' + assert operation.translated_message == 'Waiting to start: op1' + + operation.exception = None + operation.state = Operation.State.RUNNING + assert operation.message == '{name}' + assert operation.translated_message == 'op1' + + operation.exception = None + operation.state = Operation.State.COMPLETED + assert operation.message == 'Finished: {name}' + assert operation.translated_message == 'Finished: op1' + + +@patch('plinth.app.App.get') +@pytest.mark.django_db +def test_update_notification(app_get): + """Test that operation notification is created.""" + app_get.return_value = TestApp() + operation = Operation('testapp', 'op1', Mock(), show_notification=True) + note = Notification.get('testapp-operation') + assert note.id == 'testapp-operation' + assert note.app_id == 'testapp' + assert note.severity == 'info' + assert note.title == 'Test App' + assert note.message == operation.message + assert note.body_template == 'operation-notification.html' + assert note.group == 'admin' + assert not note.dismissed + assert note.data['app_name'] == 'Test App' + assert note.data['app_icon'] is None + assert note.data['app_icon_filename'] is None + assert note.data['state'] == 'waiting' + assert note.data['exception'] is None + assert note.data['name'] == 'translate:op1' + + operation.exception = RuntimeError() + operation._update_notification() + note = Notification.get('testapp-operation') + assert note.severity == 'error' + + +def test_manager_global_instance(): + """Test that single global instance of operation's manager is available.""" + assert isinstance(operation_module.manager, OperationsManager) + + +def test_manager_init(): + """Test initializing operations manager.""" + manager = OperationsManager() + assert manager._operations == [] + assert manager._current_operation is None + assert isinstance(manager._lock, threading.RLock().__class__) + + +def test_manager_new(): + """Test creating a new operation using a manager.""" + manager = OperationsManager() + event = threading.Event() + + def target(): + event.wait() + + operation = manager.new('testapp', 'op1', target) + assert isinstance(operation, Operation) + assert manager._current_operation == operation + assert manager._operations == [operation] + event.set() + operation.join() + assert manager._current_operation is None + assert manager._operations == [operation] + + +def test_manager_new_without_show_message(): + """Test creating an operation that does not show message.""" + manager = OperationsManager() + event = threading.Event() + + def target(): + event.wait() + + operation = manager.new('testapp', 'op1', target, show_message=False) + event.set() + operation.join() + assert manager._current_operation is None + assert manager._operations == [] + + +def test_manager_scheduling(): + """Test creating a multiple operations and scheduling them.""" + manager = OperationsManager() + event1 = threading.Event() + event2 = threading.Event() + event3 = threading.Event() + + operation1 = manager.new('testapp', 'op1', event1.wait) + operation2 = manager.new('testapp', 'op2', event2.wait) + operation3 = manager.new('testapp', 'op3', event3.wait) + + def _assert_is_running(current_operation): + assert manager._current_operation == current_operation + assert manager._operations == [operation1, operation2, operation3] + for operation in [operation1, operation2, operation3]: + alive = (operation == current_operation) + assert operation.thread.is_alive() == alive + + _assert_is_running(operation1) + event1.set() + operation1.join() + + _assert_is_running(operation2) + event2.set() + operation2.join() + + _assert_is_running(operation3) + event3.set() + operation3.join() + + +def test_manager_filter(): + """Test returning filtered operations.""" + manager = OperationsManager() + operation1 = manager.new('testapp1', 'op1', Mock()) + operation2 = manager.new('testapp1', 'op2', Mock()) + operation3 = manager.new('testapp2', 'op3', Mock()) + manager.filter('testapp1') == [operation1, operation2] + manager.filter('testapp2') == [operation3] + + +def test_manager_collect_results(): + """Test collecting results from the manager.""" + manager = OperationsManager() + event = threading.Event() + + operation1 = manager.new('testapp1', 'op1', Mock()) + operation2 = manager.new('testapp2', 'op2', Mock()) + operation3 = manager.new('testapp1', 'op3', event.wait) + operation1.join() + operation2.join() + assert manager.collect_results('testapp1') == [operation1] + assert manager._operations == [operation2, operation3] + event.set() + operation3.join() + assert manager.collect_results('testapp1') == [operation3] + assert manager._operations == [operation2]