334 lines
11 KiB
Python
Executable file
334 lines
11 KiB
Python
Executable file
#!/usr/bin/env python2.4
|
|
#
|
|
# This file is part of gpgpy-ezmlm - an encryption filter for the
|
|
# ezmlm-idx mailinglist manager.
|
|
#
|
|
# Copyright 02007 Sense.Lab e.V. <info@senselab.org>
|
|
#
|
|
# This script decrypts an incoming mail and encrypts it for each recipient
|
|
# separately. Afterwards it calls qmail-queue for each recipient.
|
|
# It is meant as a wrapper around qmail-queue. See 'man qmail-queue' for
|
|
# details of the qmail-queue interface.
|
|
#
|
|
# Syntax:
|
|
# gpgpy-ezmlm-encrypt [MAILINGLIST_DIRECTORY]
|
|
#
|
|
# If no MAILINGLIST_DIRECTORY is given, then it will only run some self-tests.
|
|
#
|
|
# Environment settings:
|
|
# - QMAILQUEUE should contain the path of the qmail-queue program (or a
|
|
# substitute - e.g. for spam filtering) - otherwise the default location
|
|
# /var/qmail/bin/qmail-queue is used
|
|
#
|
|
#
|
|
# gpgpy-ezmlm is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# gpgpy-ezmlm is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with the CryptoBox; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
|
|
|
|
# TODO:
|
|
# implement 'sign_messages'
|
|
# implement 'hide_subject'
|
|
|
|
|
|
import sys, os, re
|
|
import email
|
|
import email.Parser
|
|
import email.Message
|
|
import subprocess
|
|
|
|
|
|
############### some default settings ##################
|
|
|
|
## we will finally deliver via the original qmail-queue
|
|
if os.environ.has_key("QMAILQUEUE"):
|
|
QMAILQUEUE_BIN = os.environ["QMAILQUEUE"]
|
|
else:
|
|
QMAILQUEUE_BIN = '/var/qmail/bin/qmail-queue'
|
|
|
|
## the list specific configuration file
|
|
CONF_FILE = 'conf-gpgpy'
|
|
|
|
## default settings - the setting keys must always be lower case!
|
|
DEFAULT_SETTINGS = {
|
|
"plain_without_key": False,
|
|
"sign_messages": False,
|
|
"gnupg_dir": ".gnupg"
|
|
}
|
|
|
|
## put this warning into a mail instead of the encrypted content, in case
|
|
## the key of the recipient is missing
|
|
NOKEY_WARNING = "No valid encryption key found!"
|
|
|
|
|
|
## in case of 'cannot decrypt' errors
|
|
DECRYPT_ERROR = "Failed to decrypt the mail that was sent to the mailinglist. Please check the configuration of the mailinglist. Maybe the sender used an invalid encryption key?"
|
|
|
|
|
|
################ the encryption handler ################
|
|
|
|
class MailEncryption:
|
|
|
|
def __init__(self, list_dir):
|
|
self.list_dir = os.path.abspath(list_dir)
|
|
## does the list directory exist?
|
|
if not os.access(self.list_dir, os.X_OK):
|
|
internal_error("Could not access mailinglist directory: %s" \
|
|
% self.list_dir)
|
|
## read the config file
|
|
self.config = self.read_config()
|
|
## does the gnupg directory exist?
|
|
if not os.access(self.config["gnupg_dir"], os.X_OK):
|
|
internal_error("Could not access gnupg directory: %s" \
|
|
% self.config["gnupg_dir"])
|
|
## set GPGHOME environment - this should be used by pyme
|
|
os.environ["GNUPGHOME"] = self.config["gnupg_dir"]
|
|
## we _must_ import pyme after configuring the GNUPGHOME setting
|
|
import pyme.core
|
|
self.pyme = pyme
|
|
self.context = self.pyme.core.Context()
|
|
self.context.set_armor(1)
|
|
|
|
|
|
def read_config(self):
|
|
"""Read the config file.
|
|
|
|
If a value is not defined in the config file, then the default value
|
|
is used.
|
|
Any line that does not really look like a config setting is ignored.
|
|
Any unknown configuration settings are ignored, too.
|
|
"""
|
|
result = DEFAULT_SETTINGS
|
|
## retrieve the absolute path of the configuration file
|
|
## by default it is relative to the mailinglist's directory
|
|
if os.path.isabs(CONF_FILE):
|
|
conf_file = CONF_FILE
|
|
else:
|
|
conf_file = os.path.join(self.list_dir, CONF_FILE)
|
|
if not os.access(conf_file, os.R_OK):
|
|
internal_error("Could not read gpgpy-ezmlm config file: %s" % conf_file)
|
|
## read all lines of the configuration file
|
|
all_lines = [ e.strip() for e in file(conf_file).readlines() ]
|
|
for line in all_lines:
|
|
## ignore empty lines, comments and lines without "="
|
|
if (not line) or e.startswith("#") or (e.find("=") == -1):
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
## turn everything to lower case and remove surrounding whitespace
|
|
key, value = (key.strip().lower(), value.strip())
|
|
if (len(value) > 1) and \
|
|
((value.startswith('"') and value.endswith('"')) \
|
|
or (value.startswith("'") and value.endswith("'"))):
|
|
value = value[1:-1]
|
|
## ignore empty values or keys
|
|
if not key or not value:
|
|
continue
|
|
## check boolean values
|
|
if key in ['plain_without_key', 'sign_messages']:
|
|
if value.lower() == 'no':
|
|
result[key] = False
|
|
elif value.lower() == 'yes':
|
|
result[key] = True
|
|
else:
|
|
continue
|
|
## process the key directory
|
|
elif key == 'gnupg_dir':
|
|
result[key] = value
|
|
## unknown setting - ignore
|
|
else:
|
|
continue
|
|
if result["gnupg_dir"].startswith('~'):
|
|
result["gnupg_dir"] = os.path.expanduser(result["gnupg_dir"])
|
|
elif not os.path.isabs(result["gnupg_dir"]):
|
|
result["gnupg_dir"] = os.path.abspath(os.path.join(
|
|
self.list_dir, result["gnupg_dir"]))
|
|
return result
|
|
|
|
|
|
def get_valid_keys(self, pattern=""):
|
|
return [ key for key in self.context.op_keylist_all(pattern, 0)
|
|
if (key.can_encrypt != 0) ]
|
|
|
|
|
|
def encrypt_to_keys(self, plain, keylist):
|
|
plaindata = self.pyme.core.Data(plain)
|
|
cipher = self.pyme.core.Data(plain)
|
|
self.context.op_encrypt(keylist, 1, plaindata, cipher)
|
|
cipher.seek(0, 0)
|
|
return cipher.read()
|
|
|
|
|
|
def reencrypt_mail(self, mail, keys):
|
|
if mail.is_multipart():
|
|
payloads = mail.get_payload()
|
|
index = 0
|
|
while index < len(payloads):
|
|
if self.is_encrypted(payloads[index].get_payload()):
|
|
decrypted_part = email.message_from_string(
|
|
self.decrypt_block(payloads[index].get_payload()))
|
|
if keys:
|
|
payloads[index].set_payload(self.encrypt_to_keys(
|
|
decrypted_part.as_string(), keys))
|
|
else:
|
|
if self.config["plain_without_key"]:
|
|
payloads[index].set_payload(decrypted_part.as_string())
|
|
else:
|
|
payloads[index].set_payload(NOKEY_WARNING)
|
|
index += 1
|
|
else:
|
|
if self.is_encrypted(mail.get_payload()):
|
|
if keys:
|
|
mail.set_payload(self.encrypt_to_keys(
|
|
self.decrypt_block(mail.get_payload()), keys))
|
|
else:
|
|
if self.config["plain_without_key"]:
|
|
mail.set_payload(self.decrypt_block(mail.get_payload()))
|
|
else:
|
|
mail.set_payload(NOKEY_WARNING)
|
|
|
|
|
|
|
|
def is_encrypted(self, text):
|
|
return text.find("-----BEGIN PGP MESSAGE-----") != -1
|
|
|
|
|
|
def decrypt_block(self, text):
|
|
cipher = self.pyme.core.Data(text)
|
|
plain = self.pyme.core.Data()
|
|
try:
|
|
self.context.op_decrypt(cipher, plain)
|
|
except self.pyme.errors.GPGMEError:
|
|
## decryption failed - we do not do anything
|
|
plain = self.pyme.core.Data(DECRYPT_ERROR)
|
|
plain.seek(0, 0)
|
|
return plain.read()
|
|
|
|
|
|
def process_mail(self, sender, recipient, mail_text):
|
|
"""Decrypt the mail and encrypt it again for the specified recipient
|
|
"""
|
|
mail = email.message_from_string(mail_text)
|
|
keys = self.get_valid_keys(recipient)
|
|
## reencrypt the whole mail for the specific recipient
|
|
self.reencrypt_mail(mail, keys)
|
|
## use tmpfile as input for qmail-queue
|
|
## we have to complicate things a little bit, as qmail-queue expects input
|
|
## at file handle 1 - this is usually stdout
|
|
tmpfile = os.tmpfile()
|
|
tmpfile.write('F%s\0T%s\0\0' % (sender, recipient))
|
|
tmpfile.seek(0)
|
|
## execute the original qmail-queue
|
|
proc = subprocess.Popen(
|
|
shell = False,
|
|
stdin = subprocess.PIPE,
|
|
stdout = tmpfile.fileno(),
|
|
env = os.environ,
|
|
args = [ QMAILQUEUE_BIN ] )
|
|
proc.stdin.write(mail.as_string())
|
|
## tmpfile is deleted automatically after closing
|
|
tmpfile.close()
|
|
proc.stdin.close()
|
|
proc.wait()
|
|
## exit immediately, if qmail-queue failed once
|
|
if proc.returncode != 0:
|
|
sys.exit(proc.returncode)
|
|
|
|
|
|
|
|
#################### input/output stuff ###################
|
|
|
|
def internal_error(message=None, exitcode=81):
|
|
if message:
|
|
sys.stderr.write(message + "\n")
|
|
sys.exit(exitcode)
|
|
|
|
|
|
def process_input_and_output(handler):
|
|
"""Read mail content and the envelopement information from fd1 and fd2
|
|
|
|
see 'main qmail-queue' for details
|
|
"""
|
|
in_mail = os.fdopen(0, "r")
|
|
in_envelope = os.fdopen(1, "rb")
|
|
## try to read mail and
|
|
try:
|
|
mail_text = in_mail.read()
|
|
envelope = in_envelope.read()
|
|
except IOError:
|
|
## report "Unable to read the message or envelope." (see 'man qmail-queue')
|
|
sys.exit(54)
|
|
## see 'man qmail-queue' for details of the envelope format
|
|
envelope_addresses = re.match(u'F(.+?)\0((?:T[^\0]+?\0)+)\0$', envelope)
|
|
if not envelope_addresses or (len(envelope_addresses.groups()) != 2):
|
|
## report "Envelope format error." (see 'man qmail-queue')
|
|
sys.exit(91)
|
|
## the first match is the sender address
|
|
envelope_sender = envelope_addresses.groups()[0]
|
|
## the second match is the list of all recipients
|
|
## each preceded by "T" and followed by "\0"
|
|
envelope_recipients = envelope_addresses.groups()[1].split("\0")
|
|
## remove leading "T" and skip empty values
|
|
envelope_recipients = [ e[1:] for e in envelope_recipients if e ]
|
|
for recipient in envelope_recipients:
|
|
handler.process_mail(envelope_sender, recipient, mail_text)
|
|
|
|
|
|
def check_for_errors():
|
|
errors = False
|
|
## check the original qmail-queue binary
|
|
if not os.access(QMAILQUEUE_BIN, os.X_OK):
|
|
sys.stderr.write("Could not find executable qmail-queue: %s\n" % QMAILQUEUE_BIN)
|
|
errors = True
|
|
## check the existence of the pyme module
|
|
try:
|
|
import pyme
|
|
except ImportError:
|
|
sys.stderr.write("Failed to import python-pyme!\n")
|
|
errors = True
|
|
## check if the subprocess module is available (python >= 2.4)
|
|
try:
|
|
import subprocess
|
|
except ImportError:
|
|
sys.stderr.write("Failed to import the python module 'subprocess'! It requires python2.4.\n")
|
|
errors = True
|
|
return errors == False
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# reduce priority
|
|
os.nice(5)
|
|
if not check_for_errors():
|
|
internal_error()
|
|
if len(sys.argv) == 1:
|
|
## we were only supposed to run the self-tests - exiting successfully
|
|
sys.exit(0)
|
|
## we expect exactly one parameter - the mailinglist directory
|
|
if len(sys.argv) != 2:
|
|
internal_error("More than one parameter (the mailinglist directory) given!")
|
|
## print some help if it was requested
|
|
if sys.argv[1] == '--help':
|
|
sys.stderr.write("Syntax: %s [MAILINGLIST_DIRECTORY]\n\n" % \
|
|
os.path.basename(sys.argv[0]))
|
|
sys.stderr.write("If you omit the MAILINGLIST_DIRECTORY, " \
|
|
+ "then only some self-tests are done.\n\n")
|
|
sys.exit(0)
|
|
## retrieve the mailing list directory by reading the dotqmail file
|
|
list_dir = sys.argv[1]
|
|
## does the mailinglist directory exist?
|
|
if not os.access(list_dir, os.X_OK):
|
|
internal_error("Could not access the mailinglist directory: %s" % list_dir)
|
|
## reencrypt the mail for each recipient
|
|
mail_handler = MailEncryption(list_dir)
|
|
process_input_and_output(mail_handler)
|
|
|