mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
db: Serialize most of the database queries using locks
See db.py for rationale. Tests: - Run functional tests and unit tests. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
28143c091a
commit
daabeccb60
@ -13,6 +13,7 @@ from plinth import cfg
|
||||
from plinth.signals import post_app_loading
|
||||
|
||||
from . import clients as clients_module
|
||||
from . import db
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -157,8 +158,9 @@ class App:
|
||||
from . import models
|
||||
|
||||
try:
|
||||
app_entry = models.Module.objects.get(pk=self.app_id)
|
||||
return app_entry.setup_version
|
||||
with db.lock:
|
||||
app_entry = models.Module.objects.get(pk=self.app_id)
|
||||
return app_entry.setup_version
|
||||
except models.Module.DoesNotExist:
|
||||
return 0
|
||||
|
||||
@ -173,8 +175,9 @@ class App:
|
||||
"""Set the app's setup version."""
|
||||
from . import models
|
||||
|
||||
models.Module.objects.update_or_create(
|
||||
pk=self.app_id, defaults={'setup_version': version})
|
||||
with db.lock:
|
||||
models.Module.objects.update_or_create(
|
||||
pk=self.app_id, defaults={'setup_version': version})
|
||||
|
||||
def enable(self):
|
||||
"""Enable all the components of the app."""
|
||||
|
||||
85
plinth/db.py
Normal file
85
plinth/db.py
Normal file
@ -0,0 +1,85 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
Common utilities to help with handling a database.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from typing import ClassVar
|
||||
|
||||
|
||||
class DBLock:
|
||||
"""A re-entrant lock with a fixed timeout (not -1) by default."""
|
||||
|
||||
TIMEOUT: ClassVar[float] = 30
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Create an RLock object."""
|
||||
self._lock = threading.RLock(*args, **kwargs)
|
||||
self.timeout = DBLock.TIMEOUT
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""Return RLock attributes."""
|
||||
return getattr(self._lock, name)
|
||||
|
||||
def __enter__(self):
|
||||
"""Use RLock context management."""
|
||||
return self._lock.acquire(timeout=self.timeout)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Use RLock context management."""
|
||||
try:
|
||||
self._lock.release()
|
||||
except RuntimeError:
|
||||
# Lock was not acquired
|
||||
pass
|
||||
|
||||
|
||||
# The Problem
|
||||
# -----------
|
||||
# Since this is a small application (in terms of data stored and requests per
|
||||
# second handled), we use sqlite3 as the backend database. This gives us more
|
||||
# reliability. However, sqlite3 has limited capability to handle parallel
|
||||
# requests. When we try to use multiple Model.update_or_create() calls
|
||||
# simultaneously, only the first one succeeds and the remaining fail with
|
||||
# 'Database is locked' errors.
|
||||
#
|
||||
# The 'timeout' value passed during connection creation means that queries will
|
||||
# wait for a maximum of given timeout period when trying to acquire locks.
|
||||
# However, in some cases, to prevent deadlocks caused by threads waiting on
|
||||
# each other, sqlite3 will not wait and immediately throw an exception.
|
||||
#
|
||||
# If we set the isolation level to 'EXCLUSIVE' or 'IMMEDIATE' instead of the
|
||||
# default 'DEFERRED', each transaction should acquire a write lock immediately
|
||||
# after transaction is started. This reduces the situations in which threads
|
||||
# deadlock on each other. This improves the situation somewhat. However, this
|
||||
# is still causing immediate timeout errors due to the way Django does its
|
||||
# update_or_create() queries.
|
||||
#
|
||||
# This is true even in 'WAL' (write-ahead-log) journaling mode. While WAL mode
|
||||
# makes it easier to have many read transactions while one write query is
|
||||
# happening, it does not prevent this situation.
|
||||
#
|
||||
# Django should ideally provide a way to serialize these queries specifically
|
||||
# to work with sqlite3 locking behavior. There seems to be no such way
|
||||
# currently.
|
||||
#
|
||||
# Workaround
|
||||
# ----------
|
||||
# To workaround the problem, use a simple lock to serialize all database
|
||||
# queries. Like this:
|
||||
#
|
||||
# with db.lock:
|
||||
# # do some short database operation
|
||||
#
|
||||
# Queries usually have short execution time (unless stuck on disk I/O). A lot
|
||||
# of parallel request processing is not expected from this service. We want
|
||||
# more reliability (not failing on DB locks) over the ability to run parallel
|
||||
# requests. Typically, requests waiting to perform DB queries will only have to
|
||||
# wait < 1 second to get their turn. In the worst case, the database lock will
|
||||
# not be acquired and the code continues to anyway.
|
||||
#
|
||||
# This locking can't be done in all situations. For example, queries made
|
||||
# within Django framework are not locked. Still, this approach should prevent
|
||||
# most of the significant cases where we have seen database lock issues.
|
||||
|
||||
lock = DBLock()
|
||||
@ -3,35 +3,41 @@
|
||||
Simple key/value store using Django models
|
||||
"""
|
||||
|
||||
from . import db
|
||||
|
||||
|
||||
def get(key):
|
||||
"""Return the value of a key"""
|
||||
from plinth.models import KVStore
|
||||
|
||||
# pylint: disable-msg=E1101
|
||||
return KVStore.objects.get(pk=key).value
|
||||
with db.lock:
|
||||
# pylint: disable-msg=E1101
|
||||
return KVStore.objects.get(pk=key).value
|
||||
|
||||
|
||||
def get_default(key, default_value):
|
||||
"""Return the value of the key if key exists else return default_value"""
|
||||
try:
|
||||
return get(key)
|
||||
except Exception:
|
||||
return default_value
|
||||
with db.lock:
|
||||
try:
|
||||
return get(key)
|
||||
except Exception:
|
||||
return default_value
|
||||
|
||||
|
||||
def set(key, value): # pylint: disable-msg=W0622
|
||||
"""Store the value of a key"""
|
||||
from plinth.models import KVStore
|
||||
store = KVStore(key=key, value=value)
|
||||
store.save()
|
||||
with db.lock:
|
||||
store = KVStore(key=key, value=value)
|
||||
store.save()
|
||||
|
||||
|
||||
def delete(key, ignore_missing=False):
|
||||
"""Delete a key"""
|
||||
from plinth.models import KVStore
|
||||
try:
|
||||
return KVStore.objects.get(key=key).delete()
|
||||
except KVStore.DoesNotExist:
|
||||
if not ignore_missing:
|
||||
raise
|
||||
with db.lock:
|
||||
try:
|
||||
return KVStore.objects.get(key=key).delete()
|
||||
except KVStore.DoesNotExist:
|
||||
if not ignore_missing:
|
||||
raise
|
||||
|
||||
@ -11,6 +11,8 @@ from django.core.exceptions import ValidationError
|
||||
from django.db import models
|
||||
from django.dispatch import receiver
|
||||
|
||||
from . import db
|
||||
|
||||
|
||||
class KVStore(models.Model):
|
||||
"""Model to store retrieve key/value configuration"""
|
||||
@ -45,14 +47,16 @@ class UserProfile(models.Model):
|
||||
@receiver(models.signals.post_save, sender=User)
|
||||
def _on_user_post_save(sender, instance, **kwargs):
|
||||
"""When the user model is saved, user profile too."""
|
||||
if hasattr(instance, 'userprofile'):
|
||||
instance.userprofile.save()
|
||||
else:
|
||||
UserProfile.objects.update_or_create(user=instance)
|
||||
with db.lock:
|
||||
if hasattr(instance, 'userprofile'):
|
||||
instance.userprofile.save()
|
||||
else:
|
||||
UserProfile.objects.update_or_create(user=instance)
|
||||
|
||||
|
||||
class JSONField(models.TextField):
|
||||
"""Store and retrieve JSON data into a TextField."""
|
||||
|
||||
def to_python(self, value):
|
||||
"""Deserialize a text string from form field to Python dict."""
|
||||
if not value:
|
||||
|
||||
@ -13,7 +13,7 @@ from django.utils.translation import gettext
|
||||
|
||||
from plinth import cfg
|
||||
|
||||
from . import models
|
||||
from . import db, models
|
||||
|
||||
severities = {'exception': 5, 'error': 4, 'warning': 3, 'info': 2, 'debug': 1}
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -176,7 +176,8 @@ class Notification(models.StoredNotification):
|
||||
|
||||
"""
|
||||
self.dismissed = should_dismiss
|
||||
super().save()
|
||||
with db.lock:
|
||||
super().save()
|
||||
|
||||
def clean(self):
|
||||
"""Perform additional validations on the model."""
|
||||
@ -228,16 +229,19 @@ class Notification(models.StoredNotification):
|
||||
|
||||
"""
|
||||
id = kwargs.pop('id')
|
||||
return Notification.objects.update_or_create(defaults=kwargs, id=id)[0]
|
||||
with db.lock:
|
||||
return Notification.objects.update_or_create(
|
||||
defaults=kwargs, id=id)[0]
|
||||
|
||||
@staticmethod
|
||||
def get(key): # pylint: disable=redefined-builtin
|
||||
"""Return a notification object with a matching ID."""
|
||||
# pylint: disable=no-member
|
||||
try:
|
||||
return Notification.objects.get(pk=key)
|
||||
except Notification.DoesNotExist:
|
||||
raise KeyError('No such notification')
|
||||
with db.lock:
|
||||
try:
|
||||
return Notification.objects.get(pk=key)
|
||||
except Notification.DoesNotExist:
|
||||
raise KeyError('No such notification')
|
||||
|
||||
@staticmethod
|
||||
def list(key=None, app_id=None, user=None, dismissed=False):
|
||||
@ -274,7 +278,8 @@ class Notification(models.StoredNotification):
|
||||
if dismissed is not None:
|
||||
filters.append(Q(dismissed=dismissed))
|
||||
|
||||
return Notification.objects.filter(*filters)[0:10]
|
||||
with db.lock:
|
||||
return Notification.objects.filter(*filters)[0:10]
|
||||
|
||||
@staticmethod
|
||||
def _translate(string_, data=None):
|
||||
|
||||
68
plinth/tests/test_db.py
Normal file
68
plinth/tests/test_db.py
Normal file
@ -0,0 +1,68 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
Tests for database utilities.
|
||||
"""
|
||||
import threading
|
||||
import time
|
||||
|
||||
from .. import db
|
||||
|
||||
|
||||
def test_db_lock_no_wait():
|
||||
"""Test that lock is immediately by first user."""
|
||||
lock = db.DBLock()
|
||||
|
||||
start_time = time.time()
|
||||
with lock:
|
||||
pass
|
||||
|
||||
end_time = time.time()
|
||||
assert end_time - start_time < 0.1
|
||||
|
||||
|
||||
def test_db_lock_max_wait():
|
||||
"""Test that lock waits only for timeout period."""
|
||||
event = threading.Event()
|
||||
lock = db.DBLock()
|
||||
lock.timeout = 0.25
|
||||
|
||||
def thread_func():
|
||||
with lock:
|
||||
event.set()
|
||||
time.sleep(0.3)
|
||||
|
||||
thread = threading.Thread(target=thread_func)
|
||||
thread.start()
|
||||
|
||||
event.wait()
|
||||
start_time = time.time()
|
||||
with lock as return_value:
|
||||
pass
|
||||
|
||||
end_time = time.time()
|
||||
assert end_time - start_time < 0.27
|
||||
assert not return_value
|
||||
|
||||
|
||||
def test_db_lock_release():
|
||||
"""Test that lock is available after release."""
|
||||
event = threading.Event()
|
||||
lock = db.DBLock()
|
||||
lock.timeout = 0.25
|
||||
|
||||
def thread_func():
|
||||
with lock:
|
||||
event.set()
|
||||
time.sleep(0.2)
|
||||
|
||||
thread = threading.Thread(target=thread_func)
|
||||
thread.start()
|
||||
|
||||
event.wait()
|
||||
start_time = time.time()
|
||||
with lock as return_value:
|
||||
pass
|
||||
|
||||
end_time = time.time()
|
||||
assert return_value
|
||||
assert end_time - start_time <= 0.23
|
||||
Loading…
x
Reference in New Issue
Block a user