operation: Add module to manage threaded operations

- Show a Django message if desired. Keep the operation after completion so that
the message can be collected later.

- Show notifications for running operations

  - Only if show_notification flag is set.

  - Use a custom template so that spinner can be shown.

- Log generously for operation creation, scheduling, running and completion.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-07-28 17:08:57 -07:00 committed by James Valleroy
parent c6b69b2f41
commit cbef3e0163
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
3 changed files with 606 additions and 0 deletions

244
plinth/operation.py Normal file
View File

@ -0,0 +1,244 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Utilities to run operations and show their progress or failures."""
import enum
import logging
import threading
from typing import Callable, Optional
from . import app as app_module
logger = logging.getLogger(__name__)
class Operation:
"""Represent an ongoing or finished activity."""
class State(enum.Enum):
"""Various states of an operation."""
WAITING: str = 'waiting'
RUNNING: str = 'running'
COMPLETED: str = 'completed'
def __init__(self, app_id: str, name: str, target: Callable,
args: Optional[list] = None, kwargs: Optional[dict] = None,
show_message: bool = True, show_notification: bool = False,
thread_data: Optional[dict] = None,
on_complete: Callable = None):
"""Initialize to no operation."""
self.app_id = app_id
self.name = name
self.show_message = show_message
self.show_notification = show_notification
self.target = target
self.args = args or []
self.kwargs = kwargs or {}
self.on_complete = on_complete
self.state = Operation.State.WAITING
self.return_value = None
self._message: Optional[str] = None
self.exception: Optional[Exception] = None
# Operation specific data
self.thread_data: dict = thread_data or {}
self.thread = threading.Thread(target=self._catch_thread_errors)
self.start_event = threading.Event()
setattr(self.thread, '_operation', self)
self._update_notification()
def __str__(self):
"""Return a string representation of the operation."""
return f'Operation: {self.app_id}: {self.name}'
def _catch_thread_errors(self):
"""Collect exceptions when running in a thread."""
self._update_notification()
try:
self.return_value = self.target(*self.args, **self.kwargs)
except Exception as exception:
logger.exception('Error: %s, %s', self, exception)
self.exception = exception
finally:
self.state = Operation.State.COMPLETED
self._update_notification()
# Notify
if self.on_complete:
self.on_complete(self)
def run(self):
"""Run a specified operation in a thread."""
logger.info('%s: running', str(self))
self.state = Operation.State.RUNNING
self.thread.start()
self.start_event.set()
def join(self):
"""Block the current thread until the operation is completed.
Raise an exception if the thread encountered an exception.
"""
self.start_event.wait()
self.thread.join()
if self.exception:
raise self.exception
return self.return_value
@staticmethod
def get_operation():
"""Return the operation associated with this thread."""
thread = threading.current_thread()
return thread._operation
def on_update(self, message: Optional[str] = None,
exception: Optional[Exception] = None):
"""Call from within the thread to update the progress of operation."""
if message:
self._message = message
if exception:
self.exception = exception
self._update_notification()
@property
def message(self):
"""Return a message about status of the operation."""
from django.utils.translation import gettext_noop
if self._message: # Progress has been set by the operation itself
return self._message
if self.exception: # Operation resulted in a error.
return gettext_noop('Error: {name}: {exception_message}')
if self.state == Operation.State.WAITING:
return gettext_noop('Waiting to start: {name}')
if self.state == Operation.State.RUNNING:
return '{name}' # No translation needed
if self.state == Operation.State.COMPLETED:
return gettext_noop('Finished: {name}')
@property
def translated_message(self):
"""Return a message about status of operation after translating.
Must be called from a web request (UI) thread with user language set so
that localization is done properly.
"""
from django.utils.translation import gettext
message = gettext(self.message)
message = message.format(name=self.name,
exception_message=str(self.exception))
if self.app_id:
message = message.format(
app_name=app_module.App.get(self.app_id).info.name)
return message
def _update_notification(self):
"""Show an updated notification if needed."""
if not self.show_notification:
return
from plinth.notification import Notification
severity = 'info' if not self.exception else 'error'
app = app_module.App.get(self.app_id)
data = {
'app_name': str(app.info.name),
'app_icon': app.info.icon,
'app_icon_filename': app.info.icon_filename,
'state': self.state.value,
'exception': str(self.exception) if self.exception else None,
'name': 'translate:' + str(self.name),
}
Notification.update_or_create(
id=self.app_id + '-operation', app_id=self.app_id,
severity=severity, title=app.info.name, message=self.message,
body_template='operation-notification.html', data=data,
group='admin', dismissed=False)
class OperationsManager:
"""Global handler for all operations and their results."""
def __init__(self):
"""Initialize the object."""
self._operations: list[Operation] = []
self._current_operation: Optional[Operation] = None
# Assume that operations manager will be called from various threads
# including the callback called from the threads it creates. Ensure
# that properties don't get corrupted due to race conditions when
# called from different threads by locking all code that updates them.
# It is re-entrant lock, meaning it can be re-obtained without blocking
# when done from the same thread which holds the lock.
self._lock = threading.RLock()
def new(self, *args, **kwargs):
"""Create a new operation instance and add to global list."""
with self._lock:
operation = Operation(*args, **kwargs,
on_complete=self._on_operation_complete)
self._operations.append(operation)
logger.info('%s: added', operation)
self._schedule_next()
return operation
def _on_operation_complete(self, operation):
"""Trigger next operation. Called from within previous thread."""
logger.debug('%s: on_complete called', operation)
with self._lock:
self._current_operation = None
if not operation.show_message:
# No need to keep it lingering for later collection
self._operations.remove(operation)
self._schedule_next()
def _schedule_next(self):
"""Schedule the next available operation."""
with self._lock:
if self._current_operation:
return
for operation in self._operations:
if operation.state == Operation.State.WAITING:
logger.debug('%s: scheduling', operation)
self._current_operation = operation
operation.run()
break
def filter(self, app_id):
"""Return operations matching a pattern."""
with self._lock:
return [
operation for operation in self._operations
if operation.app_id == app_id
]
def collect_results(self, app_id):
"""Return the finished operations for an app."""
results: list[Operation] = []
remaining: list[Operation] = []
with self._lock:
for operation in self._operations:
if (operation.app_id == app_id
and operation.state == Operation.State.COMPLETED):
results.append(operation)
else:
remaining.append(operation)
if results:
self._operations = remaining
return results
manager = OperationsManager()

View File

@ -0,0 +1,26 @@
{% comment %}
# SPDX-License-Identifier: AGPL-3.0-or-later
{% endcomment %}
{% load i18n %}
{% load static %}
<p>
{% if data.state == "waiting" %}
<span class="fa fa-clock-o"></span>
{% elif data.state == "running" %}
<span class="fa fa-refresh fa-spin processing"></span>
{% elif data.state == "completed" %}
{% endif %}
{{ message }}
</p>
{% if data.state == "completed" %}
<p>
<a href="{% url 'notification_dismiss' id=id %}?next={{ request.path|iriencode }}"
role="button" class="btn btn-default">
{% trans "Dismiss" %}
</a>
</p>
{% endif %}

View File

@ -0,0 +1,336 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Test module for Operation and OperationsManager."""
import threading
import time
from unittest.mock import Mock, call, patch
import pytest
from plinth import app
from .. import operation as operation_module
from ..notification import Notification
from ..operation import Operation, OperationsManager
class TestApp(app.App):
app_id = 'testapp'
def __init__(self):
super().__init__()
info = app.Info(self.app_id, 1, name='Test App')
self.add(info)
@patch('plinth.operation.Operation._update_notification')
def test_operation_default_initialization(update_notification):
"""Test Operation initialization with default values."""
target = Mock()
operation = Operation('testapp', 'op1', target)
assert operation.app_id == 'testapp'
assert operation.name == 'op1'
assert operation.show_message
assert not operation.show_notification
assert operation.target == target
assert operation.args == []
assert operation.kwargs == {}
assert operation.on_complete is None
assert operation.state == Operation.State.WAITING
assert operation._message is None
assert operation.exception is None
assert operation.thread_data == {}
assert isinstance(operation.thread, threading.Thread)
assert operation.thread._operation == operation
assert update_notification.has_calls([call()])
@patch('plinth.operation.Operation._update_notification')
def test_operation_initialization(update_notification):
"""Test Operation initialization with explicit values."""
on_complete = Mock()
operation = Operation('testapp', 'op1', Mock(), ['arg1'],
{'arg2': 'value2'}, False, True,
{'data1': 'datavalue1'}, on_complete)
assert not operation.show_message
assert operation.show_notification
assert operation.args == ['arg1']
assert operation.kwargs == {'arg2': 'value2'}
assert operation.on_complete == on_complete
assert operation.state == Operation.State.WAITING
assert operation._message is None
assert operation.exception is None
assert operation.thread_data == {'data1': 'datavalue1'}
assert update_notification.has_calls([call()])
def test_operation_str():
"""Test string representation of operation."""
operation = Operation('testapp', 'op1', Mock())
assert str(operation) == 'Operation: testapp: op1'
@patch('plinth.operation.Operation._update_notification')
def test_successful_operation(update_notification):
"""Test running a operation that succeeds."""
target = Mock()
target.return_value = 'test-return'
on_complete = Mock()
operation = Operation('testapp', 'op1', target, ['arg1'],
{'arg2': 'value2'}, on_complete=on_complete)
operation.run()
assert operation.join() == 'test-return'
target.assert_has_calls([call('arg1', arg2='value2')])
assert operation.state == Operation.State.COMPLETED
assert operation.return_value == 'test-return'
on_complete.assert_has_calls([call(operation)])
assert update_notification.has_calls([call(), call()])
@patch('plinth.operation.Operation._update_notification')
def test_error_operation(update_notification):
"""Test running an operation that fails."""
target = Mock()
target.side_effect = RuntimeError('error1')
on_complete = Mock()
operation = Operation('testapp', 'op1', target, ['arg1'],
{'arg2': 'value2'}, on_complete=on_complete)
operation.run()
with pytest.raises(RuntimeError):
operation.join()
target.assert_has_calls([call('arg1', arg2='value2')])
assert operation.state == Operation.State.COMPLETED
assert operation.exception == target.side_effect
on_complete.assert_has_calls([call(operation)])
assert update_notification.has_calls([call(), call()])
@patch('plinth.operation.Operation._update_notification')
def test_join_before_start(update_notification):
"""Test waiting until operation finishes.."""
event = threading.Event()
operation = Operation('testapp', 'op1', Mock)
success = []
def _wait():
"""Wait for operation to start."""
event.set()
operation.join()
success.append(True)
thread = threading.Thread(target=_wait)
thread.start()
event.wait()
time.sleep(0.1) # Ensure that thread is waiting before we start operation.
operation.run()
thread.join()
assert success
@patch('plinth.operation.Operation._update_notification')
def test_join_raises_exception(update_notification):
"""Test that joining raises exception if thread does.."""
target = Mock()
target.side_effect = RuntimeError('error1')
on_complete = Mock()
operation = Operation('testapp', 'op1', target, ['arg1'],
{'arg2': 'value2'}, on_complete=on_complete)
operation.run()
with pytest.raises(RuntimeError):
operation.join()
def test_getting_operation_from_thread():
"""Test that operation object can be retread from within the thread."""
def target():
operation = Operation.get_operation()
operation.thread_data['test_operation'] = operation
operation = Operation('testapp', 'op1', target)
operation.run()
operation.join()
assert operation.thread_data['test_operation'] == operation
@patch('plinth.operation.Operation._update_notification')
def test_updating_operation(update_notification):
"""Test that operation object can be updated from within the thread."""
exception = RuntimeError('error1')
def target():
operation = Operation.get_operation()
operation.on_update('message1', exception)
operation = Operation('testapp', 'op1', target)
operation.run()
with pytest.raises(RuntimeError):
operation.join()
assert operation._message == 'message1'
assert operation.exception == exception
assert update_notification.has_calls([call(), call(), call()])
@patch('plinth.app.App.get')
def test_message(app_get):
"""Test getting the operation's message."""
operation = Operation('testapp', 'op1', Mock())
operation._message = 'message1'
operation.exception = RuntimeError('error1')
assert operation.message == 'message1'
assert operation.translated_message == 'message1'
operation._message = None
assert operation.message == 'Error: {name}: {exception_message}'
assert operation.translated_message == 'Error: op1: error1'
operation.exception = None
operation.state = Operation.State.WAITING
assert operation.message == 'Waiting to start: {name}'
assert operation.translated_message == 'Waiting to start: op1'
operation.exception = None
operation.state = Operation.State.RUNNING
assert operation.message == '{name}'
assert operation.translated_message == 'op1'
operation.exception = None
operation.state = Operation.State.COMPLETED
assert operation.message == 'Finished: {name}'
assert operation.translated_message == 'Finished: op1'
@patch('plinth.app.App.get')
@pytest.mark.django_db
def test_update_notification(app_get):
"""Test that operation notification is created."""
app_get.return_value = TestApp()
operation = Operation('testapp', 'op1', Mock(), show_notification=True)
note = Notification.get('testapp-operation')
assert note.id == 'testapp-operation'
assert note.app_id == 'testapp'
assert note.severity == 'info'
assert note.title == 'Test App'
assert note.message == operation.message
assert note.body_template == 'operation-notification.html'
assert note.group == 'admin'
assert not note.dismissed
assert note.data['app_name'] == 'Test App'
assert note.data['app_icon'] is None
assert note.data['app_icon_filename'] is None
assert note.data['state'] == 'waiting'
assert note.data['exception'] is None
assert note.data['name'] == 'translate:op1'
operation.exception = RuntimeError()
operation._update_notification()
note = Notification.get('testapp-operation')
assert note.severity == 'error'
def test_manager_global_instance():
"""Test that single global instance of operation's manager is available."""
assert isinstance(operation_module.manager, OperationsManager)
def test_manager_init():
"""Test initializing operations manager."""
manager = OperationsManager()
assert manager._operations == []
assert manager._current_operation is None
assert isinstance(manager._lock, threading.RLock().__class__)
def test_manager_new():
"""Test creating a new operation using a manager."""
manager = OperationsManager()
event = threading.Event()
def target():
event.wait()
operation = manager.new('testapp', 'op1', target)
assert isinstance(operation, Operation)
assert manager._current_operation == operation
assert manager._operations == [operation]
event.set()
operation.join()
assert manager._current_operation is None
assert manager._operations == [operation]
def test_manager_new_without_show_message():
"""Test creating an operation that does not show message."""
manager = OperationsManager()
event = threading.Event()
def target():
event.wait()
operation = manager.new('testapp', 'op1', target, show_message=False)
event.set()
operation.join()
assert manager._current_operation is None
assert manager._operations == []
def test_manager_scheduling():
"""Test creating a multiple operations and scheduling them."""
manager = OperationsManager()
event1 = threading.Event()
event2 = threading.Event()
event3 = threading.Event()
operation1 = manager.new('testapp', 'op1', event1.wait)
operation2 = manager.new('testapp', 'op2', event2.wait)
operation3 = manager.new('testapp', 'op3', event3.wait)
def _assert_is_running(current_operation):
assert manager._current_operation == current_operation
assert manager._operations == [operation1, operation2, operation3]
for operation in [operation1, operation2, operation3]:
alive = (operation == current_operation)
assert operation.thread.is_alive() == alive
_assert_is_running(operation1)
event1.set()
operation1.join()
_assert_is_running(operation2)
event2.set()
operation2.join()
_assert_is_running(operation3)
event3.set()
operation3.join()
def test_manager_filter():
"""Test returning filtered operations."""
manager = OperationsManager()
operation1 = manager.new('testapp1', 'op1', Mock())
operation2 = manager.new('testapp1', 'op2', Mock())
operation3 = manager.new('testapp2', 'op3', Mock())
manager.filter('testapp1') == [operation1, operation2]
manager.filter('testapp2') == [operation3]
def test_manager_collect_results():
"""Test collecting results from the manager."""
manager = OperationsManager()
event = threading.Event()
operation1 = manager.new('testapp1', 'op1', Mock())
operation2 = manager.new('testapp2', 'op2', Mock())
operation3 = manager.new('testapp1', 'op3', event.wait)
operation1.join()
operation2.join()
assert manager.collect_results('testapp1') == [operation1]
assert manager._operations == [operation2, operation3]
event.set()
operation3.join()
assert manager.collect_results('testapp1') == [operation3]
assert manager._operations == [operation2]