*: Add some additional type annotations

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2023-09-23 22:31:31 -07:00 committed by James Valleroy
parent 38ece87c6c
commit bb7782a464
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808

View File

@ -127,7 +127,7 @@ class Operation:
return None
@property
def translated_message(self):
def translated_message(self) -> str:
"""Return a message about status of operation after translating.
Must be called from a web request (UI) thread with user language set so
@ -143,7 +143,7 @@ class Operation:
return message
def _update_notification(self):
def _update_notification(self) -> None:
"""Show an updated notification if needed."""
if not self.show_notification:
return
@ -169,7 +169,7 @@ class Operation:
class OperationsManager:
"""Global handler for all operations and their results."""
def __init__(self):
def __init__(self) -> None:
"""Initialize the object."""
self._operations: list[Operation] = []
self._current_operation: Operation | None = None
@ -182,7 +182,7 @@ class OperationsManager:
# when done from the same thread which holds the lock.
self._lock = threading.RLock()
def new(self, *args, **kwargs):
def new(self, *args, **kwargs) -> Operation:
"""Create a new operation instance and add to global list."""
with self._lock:
kwargs['on_complete'] = self._on_operation_complete
@ -192,7 +192,7 @@ class OperationsManager:
self._schedule_next()
return operation
def _on_operation_complete(self, operation):
def _on_operation_complete(self, operation: Operation):
"""Trigger next operation. Called from within previous thread."""
logger.debug('%s: on_complete called', operation)
with self._lock:
@ -203,7 +203,7 @@ class OperationsManager:
self._schedule_next()
def _schedule_next(self):
def _schedule_next(self) -> None:
"""Schedule the next available operation."""
with self._lock:
if self._current_operation:
@ -216,7 +216,7 @@ class OperationsManager:
operation.run()
break
def filter(self, app_id):
def filter(self, app_id: str) -> list[Operation]:
"""Return operations matching a pattern."""
with self._lock:
return [
@ -224,7 +224,7 @@ class OperationsManager:
if operation.app_id == app_id
]
def collect_results(self, app_id):
def collect_results(self, app_id: str) -> list[Operation]:
"""Return the finished operations for an app."""
results: list[Operation] = []
remaining: list[Operation] = []