From 40c7f97ff6fca6d2a63c14c7dcfff0da58e7c639 Mon Sep 17 00:00:00 2001 From: Nick Daly Date: Sun, 6 May 2012 18:01:08 -0500 Subject: [PATCH] Completed the module and cleaned up the documentation. --- ugly_hacks/santiago/pgpprocessor.py | 119 ++++++++++++++--------- ugly_hacks/santiago/test_pgpprocessor.py | 28 +----- 2 files changed, 77 insertions(+), 70 deletions(-) diff --git a/ugly_hacks/santiago/pgpprocessor.py b/ugly_hacks/santiago/pgpprocessor.py index 2d4b34da4..04a378396 100644 --- a/ugly_hacks/santiago/pgpprocessor.py +++ b/ugly_hacks/santiago/pgpprocessor.py @@ -1,6 +1,12 @@ -"""PGP message processing utilities.""" +"""PGP message processing utilities. -from errors import InvalidSignatureError, UnwillingHostError +Right now, this includes the Unwrapper, wihch unwraps and verifies each layer of +an onion-wrapped PGP message. + +FIXME: replace this with a real library. Why doesn't gnupg do this? + +""" +from errors import InvalidSignatureError import gnupg import re @@ -10,8 +16,6 @@ class Unwrapper(object): Good for singly- or multiply-wrapped messages. - FIXME: replace with a real library for this. Why doesn't gnupg do this? - After a single iteration, the original message is available in ``original_message`` while the message's contents are in ``str(Unwrapper)``. @@ -19,6 +23,16 @@ class Unwrapper(object): Sucessive iterations unwrap additional layers of the message. Good for onion-signed or -encrypted messages. + Using it is pretty darn simple. The following both creates and unwraps a + signed message:: + + >>> gpg = gnupg.GPG(use_agent = True) + >>> message = "hi" + >>> signed_message = str(gpg.sign(message, keyid = "0928D23A")) + >>> unwrapper = pgpprocessor.Unwrapper(signed_message) + >>> [message == x.strip() for x in unwrapper] + [True] + """ START, HEAD, BODY, FOOTER, END = "start", "header", "body", "footer", "end" @@ -40,7 +54,20 @@ class Unwrapper(object): def __init__(self, message, gpg = None, gnupg_new = None, gnupg_verify = None, gnupg_decrypt = None): + """Prepare to unwrap a PGP message. + If a gnupg.GPG instance isn't passed in as the ``gpg`` parameter, it's + created during instantiation with the ``gnupg_new`` keyword arguments. + + The ``_verify`` and ``_decrypt`` arguments are used when verifying + signatures and decrypting messages, respectively. + + post:: + + self.gpg # exists + self.gpg_data # exists + + """ if gnupg_new == None: gnupg_new = dict() if gnupg_verify == None: @@ -51,7 +78,6 @@ class Unwrapper(object): gpg = gnupg.GPG(**gnupg_new) self.message = message - self.gnupg_new = gnupg_new self.gnupg_verify = gnupg_verify self.gnupg_decrypt = gnupg_decrypt self.type = "" @@ -64,6 +90,11 @@ class Unwrapper(object): This resets it to a new or clean state, ready for the next iteration. + post:: + + True not in map((self.start, self.header, self.body, self.footer, + self.end, self.gpg_data), bool) + """ self.start = list() self.header = list() @@ -95,26 +126,26 @@ class Unwrapper(object): """ point = Unwrapper.START - type_ = "" + msg_type = "" self.reset_fields() for line in self.message.splitlines(True): if point == Unwrapper.START and line == Unwrapper.SIG_HEAD: point = Unwrapper.HEAD - type_ = Unwrapper.SIG + msg_type = Unwrapper.SIG elif point == Unwrapper.START and line == Unwrapper.CRYPT_HEAD: point = Unwrapper.HEAD - type_ = Unwrapper.CRYPT + msg_type = Unwrapper.CRYPT elif point == Unwrapper.HEAD and line == Unwrapper.SIG_BODY: point = Unwrapper.BODY elif (point == Unwrapper.BODY and line == Unwrapper.SIG_FOOTER and - type_ == Unwrapper.SIG): + msg_type == Unwrapper.SIG): point = Unwrapper.FOOTER elif ((point == Unwrapper.FOOTER and line == Unwrapper.SIG_END - and type_ == Unwrapper.SIG) + and msg_type == Unwrapper.SIG) or (point == Unwrapper.BODY and line == Unwrapper.CRYPT_END - and type_ == Unwrapper.CRYPT)): + and msg_type == Unwrapper.CRYPT)): # add the footer line to the footer, not the post-script self.footer.append(line) point = Unwrapper.END @@ -122,50 +153,61 @@ class Unwrapper(object): getattr(self, point).append(line) - self.handle_end_conditions(point, type_) + self.handle_end_conditions(point, msg_type) - return "".join(self.body) + self.type = msg_type + self.message = "".join(Unwrapper.unwrap(self.body, self.type)) - def handle_end_conditions(self, point, type_): + return self.message + + def handle_end_conditions(self, point, msg_type): """Handle end-conditions of message. Do the right thing based on the state machine's results. If there is no PGP data in the message, raise a StopIteration error. + # pre:: + # + # msg_type in (Unwrapper.CRYPT, Unwrapper.SIG) + # point == Unwrapper.END + """ - if point != Unwrapper.END or type_ not in (Unwrapper.CRYPT, - Unwrapper.SIG): + if point != Unwrapper.END or msg_type not in (Unwrapper.CRYPT, + Unwrapper.SIG): raise StopIteration("No valid PGP data.") - args = (self.gnupg_verify if type_ == Unwrapper.SIG + args = (self.gnupg_verify if msg_type == Unwrapper.SIG else self.gnupg_decrypt) self.gpg_data = { Unwrapper.SIG: self.gpg.verify, Unwrapper.CRYPT: self.gpg.decrypt - }[type_](str(self), **args) - - self.type = type_ - self.body = Unwrapper.unwrap(self.body, self.type) - - # reset the state machine, now that we've unwrapped a layer. - self.message = "".join(self.body) + }[msg_type](str(self), **args) if not (self.gpg_data and self.gpg_data.valid): raise InvalidSignatureError() + def __str__(self): + """Returns the original GPG-data in the unwrapped message. + + Non-PGP-message data (before and after the message) are not returned. + + """ + return "".join([ + "".join(x) for x in (self.header, self.body, self.footer) ]) + @classmethod - def unwrap(cls, message, type_): + def unwrap(cls, message, msg_type): """ pre:: - type_ in (Unwrapper.SIG, Unwrapper.CRYPT) + msg_type in (Unwrapper.SIG, Unwrapper.CRYPT) """ - if type_ == Unwrapper.SIG: + if msg_type == Unwrapper.SIG: target = Unwrapper.SIG_TARGET - elif type_ == Unwrapper.CRYPT: + elif msg_type == Unwrapper.CRYPT: target = Unwrapper.CRYPT_TARGET else: raise ValueError("Type must be one of: {0}".format( @@ -177,24 +219,7 @@ class Unwrapper(object): return message - def __str__(self): - """Returns the GPG-part of the current message. - - Non-PGP-message data (before and after the message) are not returned. - - """ - return "".join([ - "".join(x) for x in (self.header, self.body, self.footer) ]) - - def original_message(self): - """Returns the current wrapped message. - - It's an iterator, so previous iterations' data isn't available. - - """ - return "".join([ - "".join(x) for x in (self.start, self.header, self.body, - self.footer, self.end) ]) if __name__ == "__main__": - unittest.main() + import doctest + doctest.testmod() diff --git a/ugly_hacks/santiago/test_pgpprocessor.py b/ugly_hacks/santiago/test_pgpprocessor.py index 20a15fb00..518b4e5ad 100644 --- a/ugly_hacks/santiago/test_pgpprocessor.py +++ b/ugly_hacks/santiago/test_pgpprocessor.py @@ -8,10 +8,6 @@ isn't verifiable. """ -import sys -sys.path.extend(["../..", ".", "/home/nick/programs/python-gnupg/python-gnupg-0.2.9"]) - -from pprint import pprint import ConfigParser as configparser import gnupg import pgpprocessor @@ -23,7 +19,7 @@ def remove_line(string, line, preserve_newlines = True): """Remove a line from a multi-line string.""" if preserve_newlines and not line.endswith("\n"): - line =+ "\n" + line += "\n" return str(string.splitlines(preserve_newlines).remove(line)) @@ -45,9 +41,8 @@ class ProcessorCase(unittest.TestCase): MESSAGES.append(str(GPG.sign(MESSAGES[i], keyid = KEYID))) class UnwrapperTest(ProcessorCase): - """Verify that we can unwrap multiply-signed PGP messages correctly. + """Verify that we can unwrap multiply-signed PGP messages correctly.""" - """ def setUp(self): self.messages = list(ProcessorCase.MESSAGES) self.unwrapper = pgpprocessor.Unwrapper(self.messages[-1], @@ -64,7 +59,7 @@ class UnwrapperTest(ProcessorCase): # count each element in the iterator once, skipping the first. self.assertEqual(ITERATIONS, sum([1 for e in self.unwrapper])) - def test_creating_message_doesnt_unwrap(self): + def test_dont_uwrap(self): """Creating an unwrapper shouldn't unwrap the message. Only iterating through the unwrapper should unwrap it. We don't want to @@ -78,22 +73,9 @@ class UnwrapperTest(ProcessorCase): """The iterator should correctly unwrap each stage of the message.""" unwrapped_messages = self.messages[:-1] - - for i, message in enumerate(reversed(unwrapped_messages)): - unwrapped = self.unwrapper.next() - self.assertEqual(message.strip(), unwrapped.strip()) - - def test_original_message(self): - """Unwrapper.original_message actually returns the original message.""" - - unwrapped_messages = self.messages[:-1] - - for i, message in enumerate(reversed(unwrapped_messages)): - unwrapped = self.unwrapper.next() - - self.assertEqual(self.unwrapper.original_message().strip(), - message.strip()) + for message in reversed(unwrapped_messages): + self.assertEqual(message.strip(), self.unwrapper.next().strip()) def test_no_header_invalid(self): """Messages without heads are considered invalid."""