mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-02-04 08:13:38 +00:00
When setup is run from the application thread after startup, it continuously tries until it succeeds. However, after making the first attempt, it does not collect the status of the operation keeping the operation object in operation manager. When trying for the second time, trying to create operation with same ID fails since the operation is already present. Fix this by allowing the operation to be recreated if the existing operation has failed. Tests: - Unit tests pass. - Functional tests for bepasty app pass. - Install an app. Create an error in the setup mechanism for an app. Increment is app version number. Start the service and notice that setup of app is attempted and fails. Few seconds later the setup is attempted again and the process continues. Each time the failure is due to fault in the app's setup method rather than operation not being accepted. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
374 lines
13 KiB
Python
374 lines
13 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Test module for Operation and OperationsManager."""
|
|
|
|
import threading
|
|
import time
|
|
from collections import OrderedDict
|
|
from unittest.mock import Mock, call, patch
|
|
|
|
import pytest
|
|
|
|
from plinth import app
|
|
|
|
from .. import operation as operation_module
|
|
from ..notification import Notification
|
|
from ..operation import Operation, OperationsManager
|
|
|
|
|
|
class AppTest(app.App):
|
|
app_id = 'testapp'
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
info = app.Info(self.app_id, 1, name='Test App')
|
|
self.add(info)
|
|
|
|
|
|
@patch('plinth.operation.Operation._update_notification')
|
|
def test_operation_default_initialization(update_notification):
|
|
"""Test Operation initialization with default values."""
|
|
target = Mock()
|
|
operation = Operation('testid', 'testapp', 'op1', target)
|
|
assert operation.op_id == 'testid'
|
|
assert operation.app_id == 'testapp'
|
|
assert operation.name == 'op1'
|
|
assert operation.show_message
|
|
assert not operation.show_notification
|
|
assert operation.target == target
|
|
assert operation.args == []
|
|
assert operation.kwargs == {}
|
|
assert operation.on_complete is None
|
|
assert operation.state == Operation.State.WAITING
|
|
assert operation._message is None
|
|
assert operation.exception is None
|
|
assert operation.thread_data == {}
|
|
assert isinstance(operation.thread, threading.Thread)
|
|
assert operation.thread._operation == operation
|
|
assert update_notification.has_calls([call()])
|
|
|
|
|
|
@patch('plinth.operation.Operation._update_notification')
|
|
def test_operation_initialization(update_notification):
|
|
"""Test Operation initialization with explicit values."""
|
|
on_complete = Mock()
|
|
operation = Operation('testid', 'testapp', 'op1', Mock(), ['arg1'],
|
|
{'arg2': 'value2'}, False, True,
|
|
{'data1': 'datavalue1'}, on_complete)
|
|
assert not operation.show_message
|
|
assert operation.show_notification
|
|
assert operation.args == ['arg1']
|
|
assert operation.kwargs == {'arg2': 'value2'}
|
|
assert operation.on_complete == on_complete
|
|
assert operation.state == Operation.State.WAITING
|
|
assert operation._message is None
|
|
assert operation.exception is None
|
|
assert operation.thread_data == {'data1': 'datavalue1'}
|
|
assert update_notification.has_calls([call()])
|
|
|
|
|
|
def test_operation_str():
|
|
"""Test string representation of operation."""
|
|
operation = Operation('testid', 'testapp', 'op1', Mock())
|
|
assert str(operation) == 'Operation: testapp: op1'
|
|
|
|
|
|
@patch('plinth.operation.Operation._update_notification')
|
|
def test_successful_operation(update_notification):
|
|
"""Test running a operation that succeeds."""
|
|
target = Mock()
|
|
target.return_value = 'test-return'
|
|
on_complete = Mock()
|
|
operation = Operation('testid', 'testapp', 'op1', target, ['arg1'],
|
|
{'arg2': 'value2'}, on_complete=on_complete)
|
|
operation.run()
|
|
assert operation.join() == 'test-return'
|
|
target.assert_has_calls([call('arg1', arg2='value2')])
|
|
assert operation.state == Operation.State.COMPLETED
|
|
assert operation.return_value == 'test-return'
|
|
on_complete.assert_has_calls([call(operation)])
|
|
assert update_notification.has_calls([call(), call()])
|
|
|
|
|
|
@patch('plinth.operation.Operation._update_notification')
|
|
def test_error_operation(update_notification):
|
|
"""Test running an operation that fails."""
|
|
target = Mock()
|
|
target.side_effect = RuntimeError('error1')
|
|
on_complete = Mock()
|
|
operation = Operation('testid', 'testapp', 'op1', target, ['arg1'],
|
|
{'arg2': 'value2'}, on_complete=on_complete)
|
|
operation.run()
|
|
with pytest.raises(RuntimeError):
|
|
operation.join()
|
|
|
|
target.assert_has_calls([call('arg1', arg2='value2')])
|
|
assert operation.state == Operation.State.COMPLETED
|
|
assert operation.exception == target.side_effect
|
|
on_complete.assert_has_calls([call(operation)])
|
|
assert update_notification.has_calls([call(), call()])
|
|
|
|
|
|
@patch('plinth.operation.Operation._update_notification')
|
|
def test_join_before_start(update_notification):
|
|
"""Test waiting until operation finishes.."""
|
|
event = threading.Event()
|
|
operation = Operation('testid', 'testapp', 'op1', Mock)
|
|
success = []
|
|
|
|
def _wait():
|
|
"""Wait for operation to start."""
|
|
event.set()
|
|
operation.join()
|
|
success.append(True)
|
|
|
|
thread = threading.Thread(target=_wait)
|
|
thread.start()
|
|
event.wait()
|
|
time.sleep(0.1) # Ensure that thread is waiting before we start operation.
|
|
operation.run()
|
|
thread.join()
|
|
assert success
|
|
|
|
|
|
@patch('plinth.operation.Operation._update_notification')
|
|
def test_join_raises_exception(update_notification):
|
|
"""Test that joining raises exception if thread does.."""
|
|
target = Mock()
|
|
target.side_effect = RuntimeError('error1')
|
|
on_complete = Mock()
|
|
operation = Operation('testid', 'testapp', 'op1', target, ['arg1'],
|
|
{'arg2': 'value2'}, on_complete=on_complete)
|
|
operation.run()
|
|
with pytest.raises(RuntimeError):
|
|
operation.join()
|
|
|
|
|
|
def test_getting_operation_from_thread():
|
|
"""Test that operation object can be retread from within the thread."""
|
|
|
|
def target():
|
|
operation = Operation.get_operation()
|
|
operation.thread_data['test_operation'] = operation
|
|
|
|
operation = Operation('testid', 'testapp', 'op1', target)
|
|
operation.run()
|
|
operation.join()
|
|
assert operation.thread_data['test_operation'] == operation
|
|
|
|
|
|
@patch('plinth.operation.Operation._update_notification')
|
|
def test_updating_operation(update_notification):
|
|
"""Test that operation object can be updated from within the thread."""
|
|
exception = RuntimeError('error1')
|
|
|
|
def target():
|
|
operation = Operation.get_operation()
|
|
operation.on_update('message1', exception)
|
|
|
|
operation = Operation('testid', 'testapp', 'op1', target)
|
|
operation.run()
|
|
with pytest.raises(RuntimeError):
|
|
operation.join()
|
|
|
|
assert operation._message == 'message1'
|
|
assert operation.exception == exception
|
|
assert update_notification.has_calls([call(), call(), call()])
|
|
|
|
|
|
@patch('plinth.app.App.get')
|
|
def test_message(app_get):
|
|
"""Test getting the operation's message."""
|
|
operation = Operation('testid', 'testapp', 'op1', Mock())
|
|
operation._message = 'message1'
|
|
operation.exception = RuntimeError('error1')
|
|
assert operation.message == 'message1'
|
|
assert operation.translated_message == 'message1'
|
|
|
|
operation._message = None
|
|
assert operation.message == 'Error: {name}: {exception_message}'
|
|
assert operation.translated_message == 'Error: op1: error1'
|
|
|
|
operation.exception = None
|
|
operation.state = Operation.State.WAITING
|
|
assert operation.message == 'Waiting to start: {name}'
|
|
assert operation.translated_message == 'Waiting to start: op1'
|
|
|
|
operation.exception = None
|
|
operation.state = Operation.State.RUNNING
|
|
assert operation.message == '{name}'
|
|
assert operation.translated_message == 'op1'
|
|
|
|
operation.exception = None
|
|
operation.state = Operation.State.COMPLETED
|
|
assert operation.message == 'Finished: {name}'
|
|
assert operation.translated_message == 'Finished: op1'
|
|
|
|
|
|
@patch('plinth.app.App.get')
|
|
@pytest.mark.django_db
|
|
def test_update_notification(app_get):
|
|
"""Test that operation notification is created."""
|
|
app_get.return_value = AppTest()
|
|
operation = Operation('testid', 'testapp', 'op1', Mock(),
|
|
show_notification=True)
|
|
note = Notification.get('testapp-operation')
|
|
assert note.id == 'testapp-operation'
|
|
assert note.app_id == 'testapp'
|
|
assert note.severity == 'info'
|
|
assert note.title == 'Test App'
|
|
assert note.message == operation.message
|
|
assert note.body_template == 'operation-notification.html'
|
|
assert note.group == 'admin'
|
|
assert not note.dismissed
|
|
assert note.data['app_name'] == 'Test App'
|
|
assert note.data['app_icon'] is None
|
|
assert note.data['app_icon_filename'] is None
|
|
assert note.data['state'] == 'waiting'
|
|
assert note.data['exception'] is None
|
|
assert note.data['name'] == 'translate:op1'
|
|
|
|
operation.exception = RuntimeError()
|
|
operation._update_notification()
|
|
note = Notification.get('testapp-operation')
|
|
assert note.severity == 'error'
|
|
|
|
|
|
def test_manager_global_instance():
|
|
"""Test that single global instance of operation's manager is available."""
|
|
assert isinstance(operation_module.manager, OperationsManager)
|
|
|
|
|
|
def test_manager_init():
|
|
"""Test initializing operations manager."""
|
|
manager = OperationsManager()
|
|
assert manager._operations == {}
|
|
assert manager._current_operation is None
|
|
assert isinstance(manager._lock, threading.RLock().__class__)
|
|
|
|
|
|
def test_manager_new():
|
|
"""Test creating a new operation using a manager."""
|
|
manager = OperationsManager()
|
|
event = threading.Event()
|
|
|
|
def target():
|
|
event.wait()
|
|
|
|
operation = manager.new('testop', 'testapp', 'op1', target)
|
|
assert isinstance(operation, Operation)
|
|
assert manager._current_operation == operation
|
|
assert manager._operations == {'testop': operation}
|
|
|
|
event.set()
|
|
operation.join()
|
|
assert manager._current_operation is None
|
|
assert manager._operations == OrderedDict(testop=operation)
|
|
|
|
|
|
def test_manager_new_without_show_message():
|
|
"""Test creating an operation that does not show message."""
|
|
manager = OperationsManager()
|
|
event = threading.Event()
|
|
|
|
def target():
|
|
event.wait()
|
|
|
|
operation = manager.new('testop', 'testapp', 'op1', target,
|
|
show_message=False)
|
|
event.set()
|
|
operation.join()
|
|
assert manager._current_operation is None
|
|
assert manager._operations == {}
|
|
|
|
|
|
def test_manager_new_raises():
|
|
"""Test that a new operation is always unique."""
|
|
manager = OperationsManager()
|
|
event1 = threading.Event()
|
|
event2 = threading.Event()
|
|
|
|
operation1 = manager.new('testop1', 'testapp', 'op1',
|
|
lambda: event1.wait())
|
|
|
|
# Creating operation with different ID works
|
|
operation2 = manager.new('testop2', 'testapp', 'op3', lambda: event2.set())
|
|
|
|
assert manager._operations == OrderedDict(testop1=operation1,
|
|
testop2=operation2)
|
|
|
|
# Creating operation with same id while the operation is running/scheduled
|
|
# throws exception
|
|
with pytest.raises(KeyError):
|
|
manager.new('testop1', 'testapp', 'op1', Mock())
|
|
|
|
# Creating operation with same id after the operation is completed works.
|
|
event1.set() # Allow the first operation to complete
|
|
event2.wait(10) # Wait until the second operation has started
|
|
operation1_new = manager.new('testop1', 'testapp', 'op1', Mock())
|
|
|
|
assert manager._operations == OrderedDict(testop1=operation1_new,
|
|
testop2=operation2)
|
|
|
|
|
|
def test_manager_scheduling():
|
|
"""Test creating a multiple operations and scheduling them."""
|
|
manager = OperationsManager()
|
|
event1 = threading.Event()
|
|
event2 = threading.Event()
|
|
event3 = threading.Event()
|
|
|
|
operation1 = manager.new('testop1', 'testapp', 'op1', event1.wait)
|
|
operation2 = manager.new('testop2', 'testapp', 'op2', event2.wait)
|
|
operation3 = manager.new('testop3', 'testapp', 'op3', event3.wait)
|
|
|
|
def _assert_is_running(current_operation):
|
|
assert manager._current_operation == current_operation
|
|
assert manager._operations == OrderedDict(testop1=operation1,
|
|
testop2=operation2,
|
|
testop3=operation3)
|
|
for operation in [operation1, operation2, operation3]:
|
|
alive = (operation == current_operation)
|
|
assert operation.thread.is_alive() == alive
|
|
|
|
_assert_is_running(operation1)
|
|
event1.set()
|
|
operation1.join()
|
|
|
|
_assert_is_running(operation2)
|
|
event2.set()
|
|
operation2.join()
|
|
|
|
_assert_is_running(operation3)
|
|
event3.set()
|
|
operation3.join()
|
|
|
|
|
|
def test_manager_filter():
|
|
"""Test returning filtered operations."""
|
|
manager = OperationsManager()
|
|
operation1 = manager.new('testop1', 'testapp1', 'op1', Mock())
|
|
operation2 = manager.new('testop2', 'testapp1', 'op2', Mock())
|
|
operation3 = manager.new('testop3', 'testapp2', 'op3', Mock())
|
|
manager.filter('testapp1') == [operation1, operation2]
|
|
manager.filter('testapp2') == [operation3]
|
|
|
|
|
|
def test_manager_collect_results():
|
|
"""Test collecting results from the manager."""
|
|
manager = OperationsManager()
|
|
event = threading.Event()
|
|
|
|
operation1 = manager.new('testop1', 'testapp1', 'op1', Mock())
|
|
operation2 = manager.new('testop2', 'testapp2', 'op2', Mock())
|
|
operation3 = manager.new('testop3', 'testapp1', 'op3', event.wait)
|
|
operation1.join()
|
|
operation2.join()
|
|
assert manager.collect_results('testapp1') == [operation1]
|
|
assert manager._operations == OrderedDict(testop2=operation2,
|
|
testop3=operation3)
|
|
event.set()
|
|
operation3.join()
|
|
assert manager.collect_results('testapp1') == [operation3]
|
|
assert manager._operations == OrderedDict(testop2=operation2)
|