#!/usr/bin/env python3 # -*- coding: utf-8 -*- # This program is free software. It comes without any warranty, to # the extent permitted by applicable law. You can redistribute it # and/or modify it under the terms of the Do What The Fuck You Want # To Public License, Version 2, as published by Sam Hocevar. See # http://sam.zoy.org/wtfpl/COPYING for more details. # {{{ Constants _NAME = 'pmailq' _HELP = "[OPTIONS] [ list | parse | del ]" _DESC = "%s postfix mail queue manager" % _NAME _VERSION = '0.6' _AUTHOR = 'Emmanuel Bouthenot ' MAILQ = "postqueue -p" DELQ = "postsuper -d" # }}} # {{{ Imports import sys import os import subprocess import fcntl import select import fnmatch import argparse import re # }}} # {{{ Class Proc class Proc: def run(self, command): proc = subprocess.Popen(command, bufsize=1, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) proc.stdin.close() outfile = proc.stdout outfd = outfile.fileno() errfile = proc.stderr errfd = errfile.fileno() # avoid deadlocks self.set_no_block(outfd) self.set_no_block(errfd) outdata = errdata = bytes() outeof = erreof = False while True: # wait for activity ready = select.select([outfd, errfd], [], []) if outfd in ready[0]: outchunk = outfile.read() if outchunk == b'': outeof = True outdata = outdata + outchunk if errfd in ready[0]: errchunk = errfile.read() if errchunk == b'': erreof = True errdata = errdata + errchunk if outeof and erreof: break # give a little time for buffers to fill select.select([],[],[],.1) err = proc.wait() return err, outdata.decode('utf-8'), errdata.decode('utf-8') def set_no_block(self, fd): fl = fcntl.fcntl(fd, fcntl.F_GETFL) try: fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NDELAY) except AttributeError: fcntl.fcntl(fd, fcntl.F_SETFL, fl | fcntl.FNDELAY) # }}} # {{{ MailQueue class MailQueue: def __init__(self): self.mailqueue = [] self.filters = { 'email' : None, 'msg' : None, 'lowsize' : 0, 'upsize' : 0, 'active' : False, 'hold' : False } self.parse() def add_filter(self, key, value): self.filters[key] = value def parse(self): p_ret, p_stdout, p_stderr = Proc().run(MAILQ) # mail system down ? if p_ret != 0: sys.stderr.write ("ERR : %s\n" % ", ".join(p_stderr.strip().split("\n"))) sys.exit (-1) buffer = p_stdout.strip().split('\n'); # checking empty mail queue if len(buffer)>0 and buffer[0].strip() == "Mail queue is empty": sys.stderr.write ("INFO : %s\n" % buffer[0].strip()) return None # skip first and last line buffer = "\n".join(buffer[1:-1]).strip() for block in buffer.split("\n\n"): lines = block.split("\n") headers = lines[0].split(' ') # squeeze repeated spaces while '' in headers: headers.remove('') queue = [] dest = [] info = "" for expl in lines[1:]: expl = expl.strip() if expl.startswith("(") and expl.endswith(")"): if info == "": info = expl[1:len(expl)-1] if dest != []: queue.append({ "info" : info , "dest" : dest }) dest = [] info = expl[1:len(expl)-1] else: dest.append(expl.lower()) if dest != []: queue.append({ "info" : info , "dest" : dest }) self.mailqueue.append({ "id" : headers[0].rstrip("*!"), "active" : headers[0].endswith("*"), "hold" : headers[0].endswith("!"), "size" : headers[1], "date" : " ".join(headers[2:5]), "queue" : queue }) def check(self, size, active, hold, dest, infos): if self.filters['email'] != None: match = False for e in dest: if fnmatch.fnmatch(e.lower(), self.filters['email'].lower()): match = True if not match: return False if self.filters['msg'] != None: match = False for i in infos: if fnmatch.fnmatch(i.lower(), self.filters['msg'].lower()): match = True if not match: return False if self.filters['active'] and not active: return False if self.filters['hold'] and not hold: return False if self.filters['lowsize'] != 0 and int(size) > self.filters['lowsize']: return False if self.filters['upsize'] != 0 and int(size) < self.filters['upsize']: return False return True def cmd_list(self): for m in self.mailqueue: out = "%s\n" % m['id'] out += " -date: %s\n" % m['date'] out += " -size: %s\n" % m['size'] out += " -active: %s\n" % str(m['active']) out += " -hold: %s\n" % str(m['hold']) out += " -to:\n" to = [] i = [] for n in m['queue']: i.append(n['info']) to += n['dest'] out += " + %s : [%s]\n" % (",".join(n['dest']), n['info']) if self.check(m['size'], m['active'], m['hold'], to, i): print(out) def cmd_parse(self): for m in self.mailqueue: e = [] i = [] for n in m['queue']: i.append(n['info']) for o in n['dest']: e.append(o) if self.check(m['size'], m['active'], m['hold'], e, i): print("%s|%s|%s|%d|%d|%s" % (m['id'], m['date'], m['size'], int(m['active']), int(m['hold']), ",".join(n['dest']))) def cmd_del(self): for m in self.mailqueue: e = [] i=[] for n in m['queue']: i.append(n['info']) for o in n['dest']: e.append(o) if self.check(m['size'], m['active'], m['hold'], e, i): p_ret, _, p_stderr = Proc().run('%s %s' % (DELQ, m['id'])) if p_ret != 0: print("deleting %s [FAILED] (%s)" % (m['id'], re.sub('\s+', ' ', p_stderr).strip())) else: print("deleting %s [OK]" % m['id']) # }}} # {{{ main def main(): parser = argparse.ArgumentParser(prog=_NAME, description=_DESC) parser.add_argument( '-v', '--version', action='version', version=_VERSION) parser.add_argument( '-e', '--email', dest='email', default=None, metavar='PATTERN', help='select entries in queue with email matching PATTERN') parser.add_argument( '-m', '--msg', dest='msg', default=None, metavar='PATTERN', help='select entries in queue with error message matching PATTERN') parser.add_argument( '-l', '--size-lower', dest='lowsize', default=0, type=int, metavar='SIZE', help='select entries in queue with size lower than SIZE bytes') parser.add_argument( '-u', '--size-upper', dest='upsize', default=0, type=int, metavar='SIZE', help='select entries in queue with size upper than SIZE bytes') parser.add_argument( '-a', '--active', dest='active', default=False, action='store_true', help='select "active" entries in queue (default: no)') parser.add_argument( '-o', '--hold', dest='hold', default=False, action='store_true', help='select "on hold" entries in queue (default: no)') subparsers = parser.add_subparsers(dest='action') subparsers.add_parser( 'list', help='Show a detailed listing of the selected entries') subparsers.add_parser( 'parse', help='Show a listing of the selected entries in a machine readable format') subparsers.add_parser( 'del', help='Delete the selected entries') options = parser.parse_args() if options.action is None: options.action = 'list' m = MailQueue() m.add_filter("email", options.email) m.add_filter("msg", options.msg) m.add_filter("lowsize", options.lowsize) m.add_filter("upsize", options.upsize) m.add_filter("active", options.active) m.add_filter("hold", options.hold) if 'cmd_' + options.action not in dir(m): parser.print_help() else: getattr(m, 'cmd_' + options.action)() if __name__ == "__main__": main() # }}} # vim: foldmethod=marker foldlevel=0 foldenable