#!/usr/bin/env python2.4 # # This file is part of gpgpy-ezmlm - an encryption filter for the # ezmlm-idx mailinglist manager. # # Copyright 02007 Sense.Lab e.V. # # This script decrypts an incoming mail and encrypts it for each recipient # separately. Afterwards it calls qmail-queue for each recipient. # It is meant as a wrapper around qmail-queue. See 'man qmail-queue' for # details of the qmail-queue interface. # # Syntax: # gpgpy-ezmlm-encrypt [MAILINGLIST_DIRECTORY] # # If no MAILINGLIST_DIRECTORY is given, then it will only run some self-tests. # # Environment settings: # - QMAILQUEUE should contain the path of the qmail-queue program (or a # substitute - e.g. for spam filtering) - otherwise the default location # /var/qmail/bin/qmail-queue is used # # # gpgpy-ezmlm is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # gpgpy-ezmlm is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with the CryptoBox; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # TODO: # implement 'sign_messages' # implement 'hide_subject' import sys, os, re import email import email.Parser import email.Message import subprocess ############### some default settings ################## ## we will finally deliver via the original qmail-queue if os.environ.has_key("QMAILQUEUE"): QMAILQUEUE_BIN = os.environ["QMAILQUEUE"] else: QMAILQUEUE_BIN = '/var/qmail/bin/qmail-queue' ## the list specific configuration file CONF_FILE = 'conf-gpgpy' ## default settings - the setting keys must always be lower case! DEFAULT_SETTINGS = { "plain_without_key": False, "sign_messages": False, "gnupg_dir": ".gnupg" } ## put this warning into a mail instead of the encrypted content, in case ## the key of the recipient is missing NOKEY_WARNING = "No valid encryption key found!" ## in case of 'cannot decrypt' errors DECRYPT_ERROR = "Failed to decrypt the mail that was sent to the mailinglist. Please check the configuration of the mailinglist. Maybe the sender used an invalid encryption key?" ################ the encryption handler ################ class MailEncryption: def __init__(self, list_dir): self.list_dir = os.path.abspath(list_dir) ## does the list directory exist? if not os.access(self.list_dir, os.X_OK): internal_error("Could not access mailinglist directory: %s" \ % self.list_dir) ## read the config file self.config = self.read_config() ## does the gnupg directory exist? if not os.access(self.config["gnupg_dir"], os.X_OK): internal_error("Could not access gnupg directory: %s" \ % self.config["gnupg_dir"]) ## set GPGHOME environment - this should be used by pyme os.environ["GNUPGHOME"] = self.config["gnupg_dir"] ## we _must_ import pyme after configuring the GNUPGHOME setting import pyme.core self.pyme = pyme self.context = self.pyme.core.Context() self.context.set_armor(1) def read_config(self): """Read the config file. If a value is not defined in the config file, then the default value is used. Any line that does not really look like a config setting is ignored. Any unknown configuration settings are ignored, too. """ result = DEFAULT_SETTINGS ## retrieve the absolute path of the configuration file ## by default it is relative to the mailinglist's directory if os.path.isabs(CONF_FILE): conf_file = CONF_FILE else: conf_file = os.path.join(self.list_dir, CONF_FILE) if not os.access(conf_file, os.R_OK): internal_error("Could not read gpgpy-ezmlm config file: %s" % conf_file) ## read all lines of the configuration file all_lines = [ e.strip() for e in file(conf_file).readlines() ] for line in all_lines: ## ignore empty lines, comments and lines without "=" if (not line) or e.startswith("#") or (e.find("=") == -1): continue key, value = line.split("=", 1) ## turn everything to lower case and remove surrounding whitespace key, value = (key.strip().lower(), value.strip()) if (len(value) > 1) and \ ((value.startswith('"') and value.endswith('"')) \ or (value.startswith("'") and value.endswith("'"))): value = value[1:-1] ## ignore empty values or keys if not key or not value: continue ## check boolean values if key in ['plain_without_key', 'sign_messages']: if value.lower() == 'no': result[key] = False elif value.lower() == 'yes': result[key] = True else: continue ## process the key directory elif key == 'gnupg_dir': result[key] = value ## unknown setting - ignore else: continue if result["gnupg_dir"].startswith('~'): result["gnupg_dir"] = os.path.expanduser(result["gnupg_dir"]) elif not os.path.isabs(result["gnupg_dir"]): result["gnupg_dir"] = os.path.abspath(os.path.join( self.list_dir, result["gnupg_dir"])) return result def get_valid_keys(self, pattern=""): return [ key for key in self.context.op_keylist_all(pattern, 0) if (key.can_encrypt != 0) ] def encrypt_to_keys(self, plain, keylist): plaindata = self.pyme.core.Data(plain) cipher = self.pyme.core.Data(plain) self.context.op_encrypt(keylist, 1, plaindata, cipher) cipher.seek(0, 0) return cipher.read() def reencrypt_mail(self, mail, keys): if mail.is_multipart(): payloads = mail.get_payload() index = 0 while index < len(payloads): if self.is_encrypted(payloads[index].get_payload()): decrypted_part = email.message_from_string( self.decrypt_block(payloads[index].get_payload())) if keys: payloads[index].set_payload(self.encrypt_to_keys( decrypted_part.as_string(), keys)) else: if self.config["plain_without_key"]: payloads[index].set_payload(decrypted_part.as_string()) else: payloads[index].set_payload(NOKEY_WARNING) index += 1 else: if self.is_encrypted(mail.get_payload()): if keys: mail.set_payload(self.encrypt_to_keys( self.decrypt_block(mail.get_payload()), keys)) else: if self.config["plain_without_key"]: mail.set_payload(self.decrypt_block(mail.get_payload())) else: mail.set_payload(NOKEY_WARNING) def is_encrypted(self, text): return text.find("-----BEGIN PGP MESSAGE-----") != -1 def decrypt_block(self, text): cipher = self.pyme.core.Data(text) plain = self.pyme.core.Data() try: self.context.op_decrypt(cipher, plain) except self.pyme.errors.GPGMEError: ## decryption failed - we do not do anything plain = self.pyme.core.Data(DECRYPT_ERROR) plain.seek(0, 0) return plain.read() def process_mail(self, sender, recipient, mail_text): """Decrypt the mail and encrypt it again for the specified recipient """ mail = email.message_from_string(mail_text) keys = self.get_valid_keys(recipient) ## reencrypt the whole mail for the specific recipient self.reencrypt_mail(mail, keys) ## use tmpfile as input for qmail-queue ## we have to complicate things a little bit, as qmail-queue expects input ## at file handle 1 - this is usually stdout tmpfile = os.tmpfile() tmpfile.write('F%s\0T%s\0\0' % (sender, recipient)) tmpfile.seek(0) ## execute the original qmail-queue proc = subprocess.Popen( shell = False, stdin = subprocess.PIPE, stdout = tmpfile.fileno(), env = os.environ, args = [ QMAILQUEUE_BIN ] ) proc.stdin.write(mail.as_string()) ## tmpfile is deleted automatically after closing tmpfile.close() proc.stdin.close() proc.wait() ## exit immediately, if qmail-queue failed once if proc.returncode != 0: sys.exit(proc.returncode) #################### input/output stuff ################### def internal_error(message=None, exitcode=81): if message: sys.stderr.write(message + "\n") sys.exit(exitcode) def process_input_and_output(handler): """Read mail content and the envelopement information from fd1 and fd2 see 'main qmail-queue' for details """ in_mail = os.fdopen(0, "r") in_envelope = os.fdopen(1, "rb") ## try to read mail and try: mail_text = in_mail.read() envelope = in_envelope.read() except IOError: ## report "Unable to read the message or envelope." (see 'man qmail-queue') sys.exit(54) ## see 'man qmail-queue' for details of the envelope format envelope_addresses = re.match(u'F(.+?)\0((?:T[^\0]+?\0)+)\0$', envelope) if not envelope_addresses or (len(envelope_addresses.groups()) != 2): ## report "Envelope format error." (see 'man qmail-queue') sys.exit(91) ## the first match is the sender address envelope_sender = envelope_addresses.groups()[0] ## the second match is the list of all recipients ## each preceded by "T" and followed by "\0" envelope_recipients = envelope_addresses.groups()[1].split("\0") ## remove leading "T" and skip empty values envelope_recipients = [ e[1:] for e in envelope_recipients if e ] for recipient in envelope_recipients: handler.process_mail(envelope_sender, recipient, mail_text) def check_for_errors(): errors = False ## check the original qmail-queue binary if not os.access(QMAILQUEUE_BIN, os.X_OK): sys.stderr.write("Could not find executable qmail-queue: %s\n" % QMAILQUEUE_BIN) errors = True ## check the existence of the pyme module try: import pyme except ImportError: sys.stderr.write("Failed to import python-pyme!\n") errors = True ## check if the subprocess module is available (python >= 2.4) try: import subprocess except ImportError: sys.stderr.write("Failed to import the python module 'subprocess'! It requires python2.4.\n") errors = True return errors == False if __name__ == '__main__': # reduce priority os.nice(5) if not check_for_errors(): internal_error() if len(sys.argv) == 1: ## we were only supposed to run the self-tests - exiting successfully sys.exit(0) ## we expect exactly one parameter - the mailinglist directory if len(sys.argv) != 2: internal_error("More than one parameter (the mailinglist directory) given!") ## print some help if it was requested if sys.argv[1] == '--help': sys.stderr.write("Syntax: %s [MAILINGLIST_DIRECTORY]\n\n" % \ os.path.basename(sys.argv[0])) sys.stderr.write("If you omit the MAILINGLIST_DIRECTORY, " \ + "then only some self-tests are done.\n\n") sys.exit(0) ## retrieve the mailing list directory by reading the dotqmail file list_dir = sys.argv[1] ## does the mailinglist directory exist? if not os.access(list_dir, os.X_OK): internal_error("Could not access the mailinglist directory: %s" % list_dir) ## reencrypt the mail for each recipient mail_handler = MailEncryption(list_dir) process_input_and_output(mail_handler)