Completed the module and cleaned up the documentation.

This commit is contained in:
Nick Daly 2012-05-06 18:01:08 -05:00
parent c28f5464ed
commit 40c7f97ff6
2 changed files with 77 additions and 70 deletions

View File

@ -1,6 +1,12 @@
"""PGP message processing utilities.""" """PGP message processing utilities.
from errors import InvalidSignatureError, UnwillingHostError Right now, this includes the Unwrapper, wihch unwraps and verifies each layer of
an onion-wrapped PGP message.
FIXME: replace this with a real library. Why doesn't gnupg do this?
"""
from errors import InvalidSignatureError
import gnupg import gnupg
import re import re
@ -10,8 +16,6 @@ class Unwrapper(object):
Good for singly- or multiply-wrapped messages. Good for singly- or multiply-wrapped messages.
FIXME: replace with a real library for this. Why doesn't gnupg do this?
After a single iteration, the original message is available in After a single iteration, the original message is available in
``original_message`` while the message's contents are in ``original_message`` while the message's contents are in
``str(Unwrapper)``. ``str(Unwrapper)``.
@ -19,6 +23,16 @@ class Unwrapper(object):
Sucessive iterations unwrap additional layers of the message. Good for Sucessive iterations unwrap additional layers of the message. Good for
onion-signed or -encrypted messages. onion-signed or -encrypted messages.
Using it is pretty darn simple. The following both creates and unwraps a
signed message::
>>> gpg = gnupg.GPG(use_agent = True)
>>> message = "hi"
>>> signed_message = str(gpg.sign(message, keyid = "0928D23A"))
>>> unwrapper = pgpprocessor.Unwrapper(signed_message)
>>> [message == x.strip() for x in unwrapper]
[True]
""" """
START, HEAD, BODY, FOOTER, END = "start", "header", "body", "footer", "end" START, HEAD, BODY, FOOTER, END = "start", "header", "body", "footer", "end"
@ -40,7 +54,20 @@ class Unwrapper(object):
def __init__(self, message, gpg = None, def __init__(self, message, gpg = None,
gnupg_new = None, gnupg_verify = None, gnupg_decrypt = None): gnupg_new = None, gnupg_verify = None, gnupg_decrypt = None):
"""Prepare to unwrap a PGP message.
If a gnupg.GPG instance isn't passed in as the ``gpg`` parameter, it's
created during instantiation with the ``gnupg_new`` keyword arguments.
The ``_verify`` and ``_decrypt`` arguments are used when verifying
signatures and decrypting messages, respectively.
post::
self.gpg # exists
self.gpg_data # exists
"""
if gnupg_new == None: if gnupg_new == None:
gnupg_new = dict() gnupg_new = dict()
if gnupg_verify == None: if gnupg_verify == None:
@ -51,7 +78,6 @@ class Unwrapper(object):
gpg = gnupg.GPG(**gnupg_new) gpg = gnupg.GPG(**gnupg_new)
self.message = message self.message = message
self.gnupg_new = gnupg_new
self.gnupg_verify = gnupg_verify self.gnupg_verify = gnupg_verify
self.gnupg_decrypt = gnupg_decrypt self.gnupg_decrypt = gnupg_decrypt
self.type = "" self.type = ""
@ -64,6 +90,11 @@ class Unwrapper(object):
This resets it to a new or clean state, ready for the next iteration. This resets it to a new or clean state, ready for the next iteration.
post::
True not in map((self.start, self.header, self.body, self.footer,
self.end, self.gpg_data), bool)
""" """
self.start = list() self.start = list()
self.header = list() self.header = list()
@ -95,26 +126,26 @@ class Unwrapper(object):
""" """
point = Unwrapper.START point = Unwrapper.START
type_ = "" msg_type = ""
self.reset_fields() self.reset_fields()
for line in self.message.splitlines(True): for line in self.message.splitlines(True):
if point == Unwrapper.START and line == Unwrapper.SIG_HEAD: if point == Unwrapper.START and line == Unwrapper.SIG_HEAD:
point = Unwrapper.HEAD point = Unwrapper.HEAD
type_ = Unwrapper.SIG msg_type = Unwrapper.SIG
elif point == Unwrapper.START and line == Unwrapper.CRYPT_HEAD: elif point == Unwrapper.START and line == Unwrapper.CRYPT_HEAD:
point = Unwrapper.HEAD point = Unwrapper.HEAD
type_ = Unwrapper.CRYPT msg_type = Unwrapper.CRYPT
elif point == Unwrapper.HEAD and line == Unwrapper.SIG_BODY: elif point == Unwrapper.HEAD and line == Unwrapper.SIG_BODY:
point = Unwrapper.BODY point = Unwrapper.BODY
elif (point == Unwrapper.BODY and line == Unwrapper.SIG_FOOTER and elif (point == Unwrapper.BODY and line == Unwrapper.SIG_FOOTER and
type_ == Unwrapper.SIG): msg_type == Unwrapper.SIG):
point = Unwrapper.FOOTER point = Unwrapper.FOOTER
elif ((point == Unwrapper.FOOTER and line == Unwrapper.SIG_END elif ((point == Unwrapper.FOOTER and line == Unwrapper.SIG_END
and type_ == Unwrapper.SIG) and msg_type == Unwrapper.SIG)
or (point == Unwrapper.BODY and line == Unwrapper.CRYPT_END or (point == Unwrapper.BODY and line == Unwrapper.CRYPT_END
and type_ == Unwrapper.CRYPT)): and msg_type == Unwrapper.CRYPT)):
# add the footer line to the footer, not the post-script # add the footer line to the footer, not the post-script
self.footer.append(line) self.footer.append(line)
point = Unwrapper.END point = Unwrapper.END
@ -122,50 +153,61 @@ class Unwrapper(object):
getattr(self, point).append(line) getattr(self, point).append(line)
self.handle_end_conditions(point, type_) self.handle_end_conditions(point, msg_type)
return "".join(self.body) self.type = msg_type
self.message = "".join(Unwrapper.unwrap(self.body, self.type))
def handle_end_conditions(self, point, type_): return self.message
def handle_end_conditions(self, point, msg_type):
"""Handle end-conditions of message. """Handle end-conditions of message.
Do the right thing based on the state machine's results. If there is no Do the right thing based on the state machine's results. If there is no
PGP data in the message, raise a StopIteration error. PGP data in the message, raise a StopIteration error.
# pre::
#
# msg_type in (Unwrapper.CRYPT, Unwrapper.SIG)
# point == Unwrapper.END
""" """
if point != Unwrapper.END or type_ not in (Unwrapper.CRYPT, if point != Unwrapper.END or msg_type not in (Unwrapper.CRYPT,
Unwrapper.SIG): Unwrapper.SIG):
raise StopIteration("No valid PGP data.") raise StopIteration("No valid PGP data.")
args = (self.gnupg_verify if type_ == Unwrapper.SIG args = (self.gnupg_verify if msg_type == Unwrapper.SIG
else self.gnupg_decrypt) else self.gnupg_decrypt)
self.gpg_data = { self.gpg_data = {
Unwrapper.SIG: self.gpg.verify, Unwrapper.SIG: self.gpg.verify,
Unwrapper.CRYPT: self.gpg.decrypt Unwrapper.CRYPT: self.gpg.decrypt
}[type_](str(self), **args) }[msg_type](str(self), **args)
self.type = type_
self.body = Unwrapper.unwrap(self.body, self.type)
# reset the state machine, now that we've unwrapped a layer.
self.message = "".join(self.body)
if not (self.gpg_data and self.gpg_data.valid): if not (self.gpg_data and self.gpg_data.valid):
raise InvalidSignatureError() raise InvalidSignatureError()
def __str__(self):
"""Returns the original GPG-data in the unwrapped message.
Non-PGP-message data (before and after the message) are not returned.
"""
return "".join([
"".join(x) for x in (self.header, self.body, self.footer) ])
@classmethod @classmethod
def unwrap(cls, message, type_): def unwrap(cls, message, msg_type):
""" """
pre:: pre::
type_ in (Unwrapper.SIG, Unwrapper.CRYPT) msg_type in (Unwrapper.SIG, Unwrapper.CRYPT)
""" """
if type_ == Unwrapper.SIG: if msg_type == Unwrapper.SIG:
target = Unwrapper.SIG_TARGET target = Unwrapper.SIG_TARGET
elif type_ == Unwrapper.CRYPT: elif msg_type == Unwrapper.CRYPT:
target = Unwrapper.CRYPT_TARGET target = Unwrapper.CRYPT_TARGET
else: else:
raise ValueError("Type must be one of: {0}".format( raise ValueError("Type must be one of: {0}".format(
@ -177,24 +219,7 @@ class Unwrapper(object):
return message return message
def __str__(self):
"""Returns the GPG-part of the current message.
Non-PGP-message data (before and after the message) are not returned.
"""
return "".join([
"".join(x) for x in (self.header, self.body, self.footer) ])
def original_message(self):
"""Returns the current wrapped message.
It's an iterator, so previous iterations' data isn't available.
"""
return "".join([
"".join(x) for x in (self.start, self.header, self.body,
self.footer, self.end) ])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() import doctest
doctest.testmod()

View File

@ -8,10 +8,6 @@ isn't verifiable.
""" """
import sys
sys.path.extend(["../..", ".", "/home/nick/programs/python-gnupg/python-gnupg-0.2.9"])
from pprint import pprint
import ConfigParser as configparser import ConfigParser as configparser
import gnupg import gnupg
import pgpprocessor import pgpprocessor
@ -23,7 +19,7 @@ def remove_line(string, line, preserve_newlines = True):
"""Remove a line from a multi-line string.""" """Remove a line from a multi-line string."""
if preserve_newlines and not line.endswith("\n"): if preserve_newlines and not line.endswith("\n"):
line =+ "\n" line += "\n"
return str(string.splitlines(preserve_newlines).remove(line)) return str(string.splitlines(preserve_newlines).remove(line))
@ -45,9 +41,8 @@ class ProcessorCase(unittest.TestCase):
MESSAGES.append(str(GPG.sign(MESSAGES[i], keyid = KEYID))) MESSAGES.append(str(GPG.sign(MESSAGES[i], keyid = KEYID)))
class UnwrapperTest(ProcessorCase): class UnwrapperTest(ProcessorCase):
"""Verify that we can unwrap multiply-signed PGP messages correctly. """Verify that we can unwrap multiply-signed PGP messages correctly."""
"""
def setUp(self): def setUp(self):
self.messages = list(ProcessorCase.MESSAGES) self.messages = list(ProcessorCase.MESSAGES)
self.unwrapper = pgpprocessor.Unwrapper(self.messages[-1], self.unwrapper = pgpprocessor.Unwrapper(self.messages[-1],
@ -64,7 +59,7 @@ class UnwrapperTest(ProcessorCase):
# count each element in the iterator once, skipping the first. # count each element in the iterator once, skipping the first.
self.assertEqual(ITERATIONS, sum([1 for e in self.unwrapper])) self.assertEqual(ITERATIONS, sum([1 for e in self.unwrapper]))
def test_creating_message_doesnt_unwrap(self): def test_dont_uwrap(self):
"""Creating an unwrapper shouldn't unwrap the message. """Creating an unwrapper shouldn't unwrap the message.
Only iterating through the unwrapper should unwrap it. We don't want to Only iterating through the unwrapper should unwrap it. We don't want to
@ -79,21 +74,8 @@ class UnwrapperTest(ProcessorCase):
unwrapped_messages = self.messages[:-1] unwrapped_messages = self.messages[:-1]
for i, message in enumerate(reversed(unwrapped_messages)): for message in reversed(unwrapped_messages):
unwrapped = self.unwrapper.next() self.assertEqual(message.strip(), self.unwrapper.next().strip())
self.assertEqual(message.strip(), unwrapped.strip())
def test_original_message(self):
"""Unwrapper.original_message actually returns the original message."""
unwrapped_messages = self.messages[:-1]
for i, message in enumerate(reversed(unwrapped_messages)):
unwrapped = self.unwrapper.next()
self.assertEqual(self.unwrapper.original_message().strip(),
message.strip())
def test_no_header_invalid(self): def test_no_header_invalid(self):
"""Messages without heads are considered invalid.""" """Messages without heads are considered invalid."""