mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-05-20 10:34:30 +00:00
Renamed PgpUnwrapper to Unwrapper; update correct lines.
The update-correct-lines fix is a functional one, there are tests to avoid these sorts of stupid problems now. Also, compile the regexes only once.
This commit is contained in:
parent
873fed5a28
commit
8650c6f447
@ -1,6 +1,11 @@
|
||||
"""PGP message processing utilities."""
|
||||
|
||||
class PgpUnwrapper(object):
|
||||
from errors import InvalidSignatureError, UnwillingHostError
|
||||
import gnupg
|
||||
import re
|
||||
|
||||
|
||||
class Unwrapper(object):
|
||||
"""Removes one layer of PGP message header and footer per iteration.
|
||||
|
||||
Good for singly- or multiply-wrapped messages.
|
||||
@ -9,7 +14,7 @@ class PgpUnwrapper(object):
|
||||
|
||||
After a single iteration, the original message is available in
|
||||
``original_message`` while the message's contents are in
|
||||
``str(PgpUnwrapper)``.
|
||||
``str(Unwrapper)``.
|
||||
|
||||
Sucessive iterations unwrap additional layers of the message. Good for
|
||||
onion-signed or -encrypted messages.
|
||||
@ -28,6 +33,11 @@ class PgpUnwrapper(object):
|
||||
CRYPT_LINES = (CRYPT_HEAD, CRYPT_END) = ("-----BEGIN PGP MESSAGE-----",
|
||||
"-----END PGP MESSAGE-----")
|
||||
|
||||
# ^- - - (-----SOMETHING-----|-----OTHERTHING-----)$
|
||||
_TARGET = "^(- )+({0})$"
|
||||
SIG_TARGET = re.compile(_TARGET.format("|".join(SIG_LINES)))
|
||||
CRYPT_TARGET = re.compile(_TARGET.format("|".join(CRYPT_LINES)))
|
||||
|
||||
def __init__(self, message, gpg = None,
|
||||
gnupg_new = None, gnupg_verify = None, gnupg_decrypt = None):
|
||||
|
||||
@ -73,38 +83,45 @@ class PgpUnwrapper(object):
|
||||
|
||||
Raise an InvalidSignature Error if signature isn't valid.
|
||||
|
||||
This is a really simple state-machine: certain lines advance the state
|
||||
of the machine, and until the machine is advanced again, all lines are
|
||||
added to that part of the message. When we reach the end of the
|
||||
message, the state machine is reset, but starts with the message it just
|
||||
unwrapped.
|
||||
This is a simple state-machine: certain lines advance the state of the
|
||||
machine, and until the machine is advanced again, all lines are added to
|
||||
that part of the message. When we reach the end of the message, the
|
||||
state machine is reset, but starts with the message it just unwrapped.
|
||||
|
||||
It has one quirk in two states: the head -> body and footer -> end
|
||||
transitions append their lines to the former state and then change
|
||||
state, unlike all the other transitions which change state first.
|
||||
Messages would be built non-intuitively without those exceptions.
|
||||
|
||||
"""
|
||||
point = PgpUnwrapper.START
|
||||
point = Unwrapper.START
|
||||
type_ = ""
|
||||
|
||||
self.reset_fields()
|
||||
|
||||
for line in self.message.splitlines():
|
||||
if point == PgpUnwrapper.START and line == PgpUnwrapper.SIG_HEAD:
|
||||
point = PgpUnwrapper.HEAD
|
||||
type_ = PgpUnwrapper.SIG
|
||||
elif point == PgpUnwrapper.START and line == PgpUnwrapper.CRYPT_HEAD:
|
||||
point = PgpUnwrapper.HEAD
|
||||
type_ = PgpUnwrapper.CRYPT
|
||||
elif point == PgpUnwrapper.HEAD and line == PgpUnwrapper.SIG_BODY:
|
||||
point = PgpUnwrapper.BODY
|
||||
elif (point == PgpUnwrapper.BODY and line == PgpUnwrapper.SIG_FOOTER and
|
||||
type_ == PgpUnwrapper.SIG):
|
||||
point = PgpUnwrapper.FOOTER
|
||||
elif ((point == PgpUnwrapper.FOOTER and line == PgpUnwrapper.SIG_END and type_ == PgpUnwrapper.SIG) or
|
||||
(point == PgpUnwrapper.BODY and line == PgpUnwrapper.CRYPT_END and type_ == PgpUnwrapper.CRYPT)):
|
||||
for line in self.message.splitlines(True):
|
||||
if point == Unwrapper.START and line == Unwrapper.SIG_HEAD:
|
||||
point = Unwrapper.HEAD
|
||||
type_ = Unwrapper.SIG
|
||||
elif point == Unwrapper.START and line == Unwrapper.CRYPT_HEAD:
|
||||
point = Unwrapper.HEAD
|
||||
type_ = Unwrapper.CRYPT
|
||||
elif point == Unwrapper.HEAD and line == Unwrapper.SIG_BODY:
|
||||
point = Unwrapper.BODY
|
||||
elif (point == Unwrapper.BODY and line == Unwrapper.SIG_FOOTER and
|
||||
type_ == Unwrapper.SIG):
|
||||
point = Unwrapper.FOOTER
|
||||
elif ((point == Unwrapper.FOOTER and line == Unwrapper.SIG_END
|
||||
and type_ == Unwrapper.SIG)
|
||||
or (point == Unwrapper.BODY and line == Unwrapper.CRYPT_END
|
||||
and type_ == Unwrapper.CRYPT)):
|
||||
# add the footer line to the footer, not the post-script
|
||||
self.footer.append(line)
|
||||
point = PgpUnwrapper.END
|
||||
point = Unwrapper.END
|
||||
continue
|
||||
|
||||
getattr(self, point).append(line)
|
||||
|
||||
|
||||
self.handle_message(point, type_)
|
||||
|
||||
return "\n".join(self.body)
|
||||
@ -115,26 +132,25 @@ class PgpUnwrapper(object):
|
||||
Do the right thing based on the state machine's results.
|
||||
|
||||
"""
|
||||
if point != PgpUnwrapper.END or type_ not in (PgpUnwrapper.CRYPT,
|
||||
PgpUnwrapper.SIG):
|
||||
if point != Unwrapper.END or type_ not in (Unwrapper.CRYPT,
|
||||
Unwrapper.SIG):
|
||||
raise StopIteration("No valid PGP data.")
|
||||
|
||||
args = (self.gnupg_verify if type_ == PgpUnwrapper.SIG
|
||||
args = (self.gnupg_verify if type_ == Unwrapper.SIG
|
||||
else self.gnupg_decrypt)
|
||||
|
||||
# TODO figure this out.
|
||||
self.gpg_data = {
|
||||
PgpUnwrapper.SIG: self.gpg.verify,
|
||||
PgpUnwrapper.CRYPT: self.gpg.decrypt
|
||||
Unwrapper.SIG: self.gpg.verify,
|
||||
Unwrapper.CRYPT: self.gpg.decrypt
|
||||
}[type_](str(self), **args)
|
||||
|
||||
self.type = type_
|
||||
self.body = PgpUnwrapper.unwrap(self.body)
|
||||
self.body = Unwrapper.unwrap(self.body, self.type)
|
||||
|
||||
# reset the state machine, now that we've unwrapped a layer.
|
||||
self.message = "\n".join(self.body)
|
||||
|
||||
if not data:
|
||||
if not self.gpg_data:
|
||||
raise InvalidSignatureError()
|
||||
|
||||
@classmethod
|
||||
@ -143,24 +159,20 @@ class PgpUnwrapper(object):
|
||||
|
||||
pre::
|
||||
|
||||
type_ in (PgpUnwrapper.SIG, PgpUnwrapper.CRYPT)
|
||||
type_ in (Unwrapper.SIG, Unwrapper.CRYPT)
|
||||
|
||||
"""
|
||||
if type_ == PgpUnwrapper.SIG:
|
||||
lines = PgpUnwrapper.SIG_LINES
|
||||
elif type_ == PgpUnwrapper.CRYPT:
|
||||
lines = PgpUnwrapper.CRYPT_LINES
|
||||
if type_ == Unwrapper.SIG:
|
||||
target = Unwrapper.SIG_TARGET
|
||||
elif type_ == Unwrapper.CRYPT:
|
||||
target = Unwrapper.CRYPT_TARGET
|
||||
else:
|
||||
raise ValueError("Type must be one of: {0}".format(
|
||||
", ".join(PgpUnwrapper.TYPES)))
|
||||
", ".join(Unwrapper.TYPES)))
|
||||
|
||||
# ^- - - (-----SOMETHING-----|-----OTHERTHING-----)$
|
||||
target = re.compile("^(- )+({0})$".format("|".join(lines)))
|
||||
|
||||
|
||||
for line in message:
|
||||
for i, line in enumerate(message):
|
||||
if target.match(line):
|
||||
message[message.index(line)] = line[2:]
|
||||
message[i] = line[2:]
|
||||
|
||||
return message
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user