FreedomBox/plinth/tests/test_operation.py
Sunil Mohan Adapa a233bbfd9b
operation: Add unique ID for each operation
- Helps in retrieving an operation that is currently running.

- Prevent starting an operation that is already running.

Tests:

- Unit tests work.

- Installing, uninstalling an app works.

- For upgrading an app works.

- Running background diagnostics works.

- Updating tor configuration works.

- Updating torproxy configuration works.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2023-10-07 04:52:17 +09:00

361 lines
12 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Test module for Operation and OperationsManager."""
import threading
import time
from collections import OrderedDict
from unittest.mock import Mock, call, patch
import pytest
from plinth import app
from .. import operation as operation_module
from ..notification import Notification
from ..operation import Operation, OperationsManager
class AppTest(app.App):
app_id = 'testapp'
def __init__(self):
super().__init__()
info = app.Info(self.app_id, 1, name='Test App')
self.add(info)
@patch('plinth.operation.Operation._update_notification')
def test_operation_default_initialization(update_notification):
"""Test Operation initialization with default values."""
target = Mock()
operation = Operation('testid', 'testapp', 'op1', target)
assert operation.op_id == 'testid'
assert operation.app_id == 'testapp'
assert operation.name == 'op1'
assert operation.show_message
assert not operation.show_notification
assert operation.target == target
assert operation.args == []
assert operation.kwargs == {}
assert operation.on_complete is None
assert operation.state == Operation.State.WAITING
assert operation._message is None
assert operation.exception is None
assert operation.thread_data == {}
assert isinstance(operation.thread, threading.Thread)
assert operation.thread._operation == operation
assert update_notification.has_calls([call()])
@patch('plinth.operation.Operation._update_notification')
def test_operation_initialization(update_notification):
"""Test Operation initialization with explicit values."""
on_complete = Mock()
operation = Operation('testid', 'testapp', 'op1', Mock(), ['arg1'],
{'arg2': 'value2'}, False, True,
{'data1': 'datavalue1'}, on_complete)
assert not operation.show_message
assert operation.show_notification
assert operation.args == ['arg1']
assert operation.kwargs == {'arg2': 'value2'}
assert operation.on_complete == on_complete
assert operation.state == Operation.State.WAITING
assert operation._message is None
assert operation.exception is None
assert operation.thread_data == {'data1': 'datavalue1'}
assert update_notification.has_calls([call()])
def test_operation_str():
"""Test string representation of operation."""
operation = Operation('testid', 'testapp', 'op1', Mock())
assert str(operation) == 'Operation: testapp: op1'
@patch('plinth.operation.Operation._update_notification')
def test_successful_operation(update_notification):
"""Test running a operation that succeeds."""
target = Mock()
target.return_value = 'test-return'
on_complete = Mock()
operation = Operation('testid', 'testapp', 'op1', target, ['arg1'],
{'arg2': 'value2'}, on_complete=on_complete)
operation.run()
assert operation.join() == 'test-return'
target.assert_has_calls([call('arg1', arg2='value2')])
assert operation.state == Operation.State.COMPLETED
assert operation.return_value == 'test-return'
on_complete.assert_has_calls([call(operation)])
assert update_notification.has_calls([call(), call()])
@patch('plinth.operation.Operation._update_notification')
def test_error_operation(update_notification):
"""Test running an operation that fails."""
target = Mock()
target.side_effect = RuntimeError('error1')
on_complete = Mock()
operation = Operation('testid', 'testapp', 'op1', target, ['arg1'],
{'arg2': 'value2'}, on_complete=on_complete)
operation.run()
with pytest.raises(RuntimeError):
operation.join()
target.assert_has_calls([call('arg1', arg2='value2')])
assert operation.state == Operation.State.COMPLETED
assert operation.exception == target.side_effect
on_complete.assert_has_calls([call(operation)])
assert update_notification.has_calls([call(), call()])
@patch('plinth.operation.Operation._update_notification')
def test_join_before_start(update_notification):
"""Test waiting until operation finishes.."""
event = threading.Event()
operation = Operation('testid', 'testapp', 'op1', Mock)
success = []
def _wait():
"""Wait for operation to start."""
event.set()
operation.join()
success.append(True)
thread = threading.Thread(target=_wait)
thread.start()
event.wait()
time.sleep(0.1) # Ensure that thread is waiting before we start operation.
operation.run()
thread.join()
assert success
@patch('plinth.operation.Operation._update_notification')
def test_join_raises_exception(update_notification):
"""Test that joining raises exception if thread does.."""
target = Mock()
target.side_effect = RuntimeError('error1')
on_complete = Mock()
operation = Operation('testid', 'testapp', 'op1', target, ['arg1'],
{'arg2': 'value2'}, on_complete=on_complete)
operation.run()
with pytest.raises(RuntimeError):
operation.join()
def test_getting_operation_from_thread():
"""Test that operation object can be retread from within the thread."""
def target():
operation = Operation.get_operation()
operation.thread_data['test_operation'] = operation
operation = Operation('testid', 'testapp', 'op1', target)
operation.run()
operation.join()
assert operation.thread_data['test_operation'] == operation
@patch('plinth.operation.Operation._update_notification')
def test_updating_operation(update_notification):
"""Test that operation object can be updated from within the thread."""
exception = RuntimeError('error1')
def target():
operation = Operation.get_operation()
operation.on_update('message1', exception)
operation = Operation('testid', 'testapp', 'op1', target)
operation.run()
with pytest.raises(RuntimeError):
operation.join()
assert operation._message == 'message1'
assert operation.exception == exception
assert update_notification.has_calls([call(), call(), call()])
@patch('plinth.app.App.get')
def test_message(app_get):
"""Test getting the operation's message."""
operation = Operation('testid', 'testapp', 'op1', Mock())
operation._message = 'message1'
operation.exception = RuntimeError('error1')
assert operation.message == 'message1'
assert operation.translated_message == 'message1'
operation._message = None
assert operation.message == 'Error: {name}: {exception_message}'
assert operation.translated_message == 'Error: op1: error1'
operation.exception = None
operation.state = Operation.State.WAITING
assert operation.message == 'Waiting to start: {name}'
assert operation.translated_message == 'Waiting to start: op1'
operation.exception = None
operation.state = Operation.State.RUNNING
assert operation.message == '{name}'
assert operation.translated_message == 'op1'
operation.exception = None
operation.state = Operation.State.COMPLETED
assert operation.message == 'Finished: {name}'
assert operation.translated_message == 'Finished: op1'
@patch('plinth.app.App.get')
@pytest.mark.django_db
def test_update_notification(app_get):
"""Test that operation notification is created."""
app_get.return_value = AppTest()
operation = Operation('testid', 'testapp', 'op1', Mock(),
show_notification=True)
note = Notification.get('testapp-operation')
assert note.id == 'testapp-operation'
assert note.app_id == 'testapp'
assert note.severity == 'info'
assert note.title == 'Test App'
assert note.message == operation.message
assert note.body_template == 'operation-notification.html'
assert note.group == 'admin'
assert not note.dismissed
assert note.data['app_name'] == 'Test App'
assert note.data['app_icon'] is None
assert note.data['app_icon_filename'] is None
assert note.data['state'] == 'waiting'
assert note.data['exception'] is None
assert note.data['name'] == 'translate:op1'
operation.exception = RuntimeError()
operation._update_notification()
note = Notification.get('testapp-operation')
assert note.severity == 'error'
def test_manager_global_instance():
"""Test that single global instance of operation's manager is available."""
assert isinstance(operation_module.manager, OperationsManager)
def test_manager_init():
"""Test initializing operations manager."""
manager = OperationsManager()
assert manager._operations == {}
assert manager._current_operation is None
assert isinstance(manager._lock, threading.RLock().__class__)
def test_manager_new():
"""Test creating a new operation using a manager."""
manager = OperationsManager()
event = threading.Event()
def target():
event.wait()
operation = manager.new('testop', 'testapp', 'op1', target)
assert isinstance(operation, Operation)
assert manager._current_operation == operation
assert manager._operations == {'testop': operation}
event.set()
operation.join()
assert manager._current_operation is None
assert manager._operations == OrderedDict(testop=operation)
def test_manager_new_without_show_message():
"""Test creating an operation that does not show message."""
manager = OperationsManager()
event = threading.Event()
def target():
event.wait()
operation = manager.new('testop', 'testapp', 'op1', target,
show_message=False)
event.set()
operation.join()
assert manager._current_operation is None
assert manager._operations == {}
def test_manager_new_raises():
"""Test that a new operation is always unique."""
manager = OperationsManager()
operation1 = manager.new('testop1', 'testapp', 'op1', Mock())
# Creating operation with same id throws exception
with pytest.raises(KeyError):
manager.new('testop1', 'testapp', 'op1', Mock())
# Creating operation with different ID works
operation2 = manager.new('testop2', 'testapp', 'op3', Mock())
assert manager._operations == OrderedDict(testop1=operation1,
testop2=operation2)
def test_manager_scheduling():
"""Test creating a multiple operations and scheduling them."""
manager = OperationsManager()
event1 = threading.Event()
event2 = threading.Event()
event3 = threading.Event()
operation1 = manager.new('testop1', 'testapp', 'op1', event1.wait)
operation2 = manager.new('testop2', 'testapp', 'op2', event2.wait)
operation3 = manager.new('testop3', 'testapp', 'op3', event3.wait)
def _assert_is_running(current_operation):
assert manager._current_operation == current_operation
assert manager._operations == OrderedDict(testop1=operation1,
testop2=operation2,
testop3=operation3)
for operation in [operation1, operation2, operation3]:
alive = (operation == current_operation)
assert operation.thread.is_alive() == alive
_assert_is_running(operation1)
event1.set()
operation1.join()
_assert_is_running(operation2)
event2.set()
operation2.join()
_assert_is_running(operation3)
event3.set()
operation3.join()
def test_manager_filter():
"""Test returning filtered operations."""
manager = OperationsManager()
operation1 = manager.new('testop1', 'testapp1', 'op1', Mock())
operation2 = manager.new('testop2', 'testapp1', 'op2', Mock())
operation3 = manager.new('testop3', 'testapp2', 'op3', Mock())
manager.filter('testapp1') == [operation1, operation2]
manager.filter('testapp2') == [operation3]
def test_manager_collect_results():
"""Test collecting results from the manager."""
manager = OperationsManager()
event = threading.Event()
operation1 = manager.new('testop1', 'testapp1', 'op1', Mock())
operation2 = manager.new('testop2', 'testapp2', 'op2', Mock())
operation3 = manager.new('testop3', 'testapp1', 'op3', event.wait)
operation1.join()
operation2.join()
assert manager.collect_results('testapp1') == [operation1]
assert manager._operations == OrderedDict(testop2=operation2,
testop3=operation3)
event.set()
operation3.join()
assert manager.collect_results('testapp1') == [operation3]
assert manager._operations == OrderedDict(testop2=operation2)