I've setup a mail server managed by a Python script via smtpd
and since I receive some spam, I decided to connect SpamAssassin to it.
Since I couldn't find a Python code that connects to SpamAssassin to get the score, I built it myself using a few pieces I found online. Here's the code:
# -*- config:utf-8 -*-
import socket, select, re, logging
from io import BytesIO
divider_pattern = re.compile(br'^(.*?)\r?\n(.*?)\r?\n\r?\n', re.DOTALL)
first_line_pattern = re.compile(br'^SPAMD/[^ ]+ 0 EX_OK$')
# @see https://github.com/slimta/python-slimta/blob/master/slimta/policy/spamassassin.py
class SpamAssassin(object):
def __init__(self, message, timeout=15):
self.score = None
self.symbols = None
# Connecting
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(timeout)
client.connect(('127.0.0.1', 783))
# Sending
client.sendall(self._build_message(message))
client.shutdown(socket.SHUT_WR)
# Reading
resfp = BytesIO()
while True:
ready = select.select([client], [], [], timeout)
if ready[0] is None:
# Kill with Timeout!
logging.info('[SpamAssassin] - Timeout ({0}s)!'.format(str(timeout)))
break
data = client.recv(4096)
if data == b'':
break
resfp.write(data)
# Closing
client.close()
client = None
self._parse_response(resfp.getvalue())
def _build_message(self, message):
reqfp = BytesIO()
data_len = str(len(message)).encode()
reqfp.write(b'SYMBOLS SPAMC/1.2\r\n')
reqfp.write(b'Content-Length: ' + data_len + b'\r\n')
reqfp.write(b'User: cx42\r\n\r\n')
reqfp.write(message)
return reqfp.getvalue()
def _parse_response(self, response):
if response == b'':
logging.info("[SPAM ASSASSIN] Empty response")
return None
match = divider_pattern.match(response)
if not match:
logging.error("[SPAM ASSASSIN] Response error:")
logging.error(response)
return None
first_line = match.group(1)
headers = match.group(2)
body = response[match.end(0):]
# Checking response is good
match = first_line_pattern.match(first_line)
if not match:
logging.error("[SPAM ASSASSIN] invalid response:")
logging.error(first_line)
return None
self.symbols = [s.decode('ascii').strip() for s in body.strip().split(',')]
headers = headers.replace(' ', '').replace(':', ';').replace('/', ';').split(';')
self.score = float(headers[2])
def get_score(self):
return self.score
def get_symbols(self):
return self.symbols
def is_spam(self, level=5):
return self.score is None or self.score >= level
And on my server script, I have the following part to check for spam:
# data is the mail body received from smtpd
assassin = SpamAssassin(data)
if assassin.is_spam():
logging.info('SpamAssassin rejected. Score of {0}'.format(assassin.get_score()))
return '554 Command rejected for policy reasons.'
When the code don't go in the if
condition, the mail is then sent.
The huge problem I'm having is that some emails are sent, even though they are considered as spam from SpamAssassin.
I know it because I've built a second script that loads the queue from postfix (via postqueue -j
for JSON format), and do a SpamAssassin check from my code below. Quite a few emails are then detected as spam.
(In order to not show too many code, the one to load the Postfix queue and clean it is here).
I don't know what is wrong in the code shown here, how is it possible for my Python code to allow spams to be sent.
I checked the logs and I don't have any exceptions from my code (like Timeouts from SpamAssassin or anything else).
For me, the issue is that the condition if assassin.is_spam()
, returns False
whereas it should return True
in some case, but I don't know how/why/when, so I'm turning to you for help.
My theories are:
- Maybe the socket is re-using some cached version that returned False for SpamAssassin, on a new email, and fails the correct check
- Some odd stuff going on with socket/spamd.py file, since it's the only place that seems to not work.
- Maybe some concurrency issues? Since there is many requests made on the server, maybe one socket is opened for multiple incoming requests, and the first result is read for all the incoming requests, accepting mail that shouldn't?