Home > Backend Development > Python Tutorial > Will Your Marketing Email End Up in Spam? We Built a Tool to Find Out

Will Your Marketing Email End Up in Spam? We Built a Tool to Find Out

Susan Sarandon
Release: 2024-12-31 09:47:10
Original
219 people have browsed it

When running email marketing campaigns, one of the biggest challenges is ensuring that your messages reach the inbox rather than the spam folder.

In this post, we’ll build a tool that can validate if your email will be marked as spam and why it's marked so.
This tool would be in the API form and deployed online, so that it can be integrated into your workflow.

The Secret Behind Spam Validating

Apache SpamAssassin is an open-source spam detection platform maintained by the Apache Software Foundation, which is a widely used tool for many email clients and email filtering tools to classify messages as spam.

It uses a multitude of rules, Bayesian filtering, and network tests to assign a spam “score” to a given email. Generally, an email scoring 5 or above is at high risk of being flagged as spam.

Since that Apache SpamAssassin is a spam detection software, it can also be used to tell if your email would be flagged as spam.

SpamAssassin’s scoring is transparent and well-documented, you can confidently use it to identify exactly which aspects of your email are causing high spam scores and improve your writing.

How to Validate Your Email Using SpamAssassin

SpamAssassin is designed to run on Linux systems. You'll need a Linux OS or create a Docker container to install and run it.

On Debian or Ubuntu systems, install SpamAssassin with:

apt-get update && apt-get install -y spamassassin
sa-update
Copy after login
Copy after login

The sa-update command ensures that SpamAssassin’s rules are up-to-date.

Once installed, you can pipe an email message into SpamAssassin’s command-line tool. The output includes an annotated version of the email with spam scores and explains which rules are triggered.

A typical usage might look like this:

spamassassin -t < input_email.txt > results.txt
Copy after login
Copy after login

results.txt will then contain the processed email with SpamAssassin’s headers and scores, like below:

X-Spam-Checker-Version: SpamAssassin 4.0.0 (2022-12-13) on 254.254.254.254
X-Spam-Level: 
X-Spam-Status: No, score=0.2 required=5.0 tests=HTML_MESSAGE,
    MIME_HTML_ONLY,MISSING_MID,NO_RECEIVED,
    NO_RELAYS autolearn=no autolearn_force=no version=4.0.0

// ...

Content analysis details:   (0.2 points, 5.0 required)

 pts rule name              description
---- ---------------------- --------------------------------------------------
 0.1 MISSING_MID            Missing Message-Id: header
-0.0 NO_RECEIVED            Informational: message has no Received headers
-0.0 NO_RELAYS              Informational: message was not relayed via SMTP
 0.0 HTML_MESSAGE           BODY: HTML included in message
 0.1 MIME_HTML_ONLY         BODY: Message only has text/html MIME parts
Copy after login
Copy after login

Wrap SpamAssassin as an API

SpamAssassin achieves its maximum potential only when encapsulated as an API, as this form makes it more flexible and allows integration into various workflows.

Imagine this: before you hit “Send” on your email, the content is first sent to the SpamAssassin API. Only if it determines that the email does not meet spam criteria is it allowed to proceed.

Let’s create a simple API that accepts these email fields: subject, html_body and text_body. It will pass the fields to SpamAssassin and return the validation result.

API Example

from fastapi import FastAPI
from datetime import datetime, timezone
from email.utils import format_datetime
from pydantic import BaseModel
import subprocess

def extract_analysis_details(text):
    lines = text.splitlines()

    start_index = None
    for i, line in enumerate(lines):
        if line.strip().startswith("pts rule"):
            start_index = i
            break

    if start_index is None:
        print("No content analysis details found.")
        return []

    data_lines = lines[start_index+2:]
    parsed_lines = []
    for line in data_lines:
        if line.strip() == "":
            break
        parsed_lines.append(line)

    results = []
    current_entry = None

    split_line = lines[start_index+1]
    pts_split, rule_split, *rest = split_line.strip().split(" ")

    pts_start = 0
    pts_end = pts_start + len(pts_split)

    rule_start = pts_end + 1
    rule_end = rule_start + len(rule_split)

    desc_start = rule_end + 1

    for line in parsed_lines:
        pts_str = line[pts_start:pts_end].strip()
        rule_name_str = line[rule_start:rule_end].strip()
        description_str = line[desc_start:].strip()

        if pts_str == "" and rule_name_str == "" and description_str:
            if current_entry:
                current_entry["description"] += " " + description_str
        else:
            current_entry = {
                "pts": pts_str,
                "rule_name": rule_name_str,
                "description": description_str
            }
            results.append(current_entry)

    return results

app = FastAPI()

class Email(BaseModel):
    subject: str
    html_body: str
    text_body: str

@app.post("/spam_check")
def spam_check(email: Email):
    # assemble the full email
    message = f"""From: example@example.com
To: recipient@example.com
Subject: {email.subject}
Date: {format_datetime(datetime.now(timezone.utc))}
MIME-Version: 1.0
Content-Type: multipart/alternative; boundary="__SPAM_ASSASSIN_BOUNDARY__"

--__SPAM_ASSASSIN_BOUNDARY__
Content-Type: text/plain; charset="utf-8"

{email.text_body}

--__SPAM_ASSASSIN_BOUNDARY__
Content-Type: text/html; charset="utf-8"

{email.html_body}

--__SPAM_ASSASSIN_BOUNDARY__--"""

    # Run SpamAssassin and capture the output directly
    output = subprocess.run(["spamassassin", "-t"],
                            input=message.encode('utf-8'),
                            capture_output=True)

    output_str = output.stdout.decode('utf-8', errors='replace')
    details = extract_analysis_details(output_str)
    return {"result": details}
Copy after login
Copy after login

In the code above, we defined a helper function, extract_analysis_details, to extract only the scoring reasons from the full result report. You can further improve this function, for example, by filtering out certain rules from the result.

The response will contain the analysis details of SpamAssassin’s results.

Let's take this input as an example:

subject

apt-get update && apt-get install -y spamassassin
sa-update
Copy after login
Copy after login

html_body

spamassassin -t < input_email.txt > results.txt
Copy after login
Copy after login

text_body

X-Spam-Checker-Version: SpamAssassin 4.0.0 (2022-12-13) on 254.254.254.254
X-Spam-Level: 
X-Spam-Status: No, score=0.2 required=5.0 tests=HTML_MESSAGE,
    MIME_HTML_ONLY,MISSING_MID,NO_RECEIVED,
    NO_RELAYS autolearn=no autolearn_force=no version=4.0.0

// ...

Content analysis details:   (0.2 points, 5.0 required)

 pts rule name              description
---- ---------------------- --------------------------------------------------
 0.1 MISSING_MID            Missing Message-Id: header
-0.0 NO_RECEIVED            Informational: message has no Received headers
-0.0 NO_RELAYS              Informational: message was not relayed via SMTP
 0.0 HTML_MESSAGE           BODY: HTML included in message
 0.1 MIME_HTML_ONLY         BODY: Message only has text/html MIME parts
Copy after login
Copy after login

The response would be like this:

from fastapi import FastAPI
from datetime import datetime, timezone
from email.utils import format_datetime
from pydantic import BaseModel
import subprocess

def extract_analysis_details(text):
    lines = text.splitlines()

    start_index = None
    for i, line in enumerate(lines):
        if line.strip().startswith("pts rule"):
            start_index = i
            break

    if start_index is None:
        print("No content analysis details found.")
        return []

    data_lines = lines[start_index+2:]
    parsed_lines = []
    for line in data_lines:
        if line.strip() == "":
            break
        parsed_lines.append(line)

    results = []
    current_entry = None

    split_line = lines[start_index+1]
    pts_split, rule_split, *rest = split_line.strip().split(" ")

    pts_start = 0
    pts_end = pts_start + len(pts_split)

    rule_start = pts_end + 1
    rule_end = rule_start + len(rule_split)

    desc_start = rule_end + 1

    for line in parsed_lines:
        pts_str = line[pts_start:pts_end].strip()
        rule_name_str = line[rule_start:rule_end].strip()
        description_str = line[desc_start:].strip()

        if pts_str == "" and rule_name_str == "" and description_str:
            if current_entry:
                current_entry["description"] += " " + description_str
        else:
            current_entry = {
                "pts": pts_str,
                "rule_name": rule_name_str,
                "description": description_str
            }
            results.append(current_entry)

    return results

app = FastAPI()

class Email(BaseModel):
    subject: str
    html_body: str
    text_body: str

@app.post("/spam_check")
def spam_check(email: Email):
    # assemble the full email
    message = f"""From: example@example.com
To: recipient@example.com
Subject: {email.subject}
Date: {format_datetime(datetime.now(timezone.utc))}
MIME-Version: 1.0
Content-Type: multipart/alternative; boundary="__SPAM_ASSASSIN_BOUNDARY__"

--__SPAM_ASSASSIN_BOUNDARY__
Content-Type: text/plain; charset="utf-8"

{email.text_body}

--__SPAM_ASSASSIN_BOUNDARY__
Content-Type: text/html; charset="utf-8"

{email.html_body}

--__SPAM_ASSASSIN_BOUNDARY__--"""

    # Run SpamAssassin and capture the output directly
    output = subprocess.run(["spamassassin", "-t"],
                            input=message.encode('utf-8'),
                            capture_output=True)

    output_str = output.stdout.decode('utf-8', errors='replace')
    details = extract_analysis_details(output_str)
    return {"result": details}
Copy after login
Copy after login

See? "Dear winner" is detected as it is commonly used in spam emails.

Deploying the API Online

Running SpamAssassin requires a Linux environment with the software installed. Traditionally, you might need an EC2 instance or a DigitalOcean droplet to deploy, which can be costly and tedious, especially if your usage is low-volume.

As for serverless platforms, they just don't let you install any system packages like SpamAssassin.

Leapcell can handle this job perfectly.

With Leapcell, you can deploy any system packages like SpamAssassin, meanwhile keep the service serverless - you only pay for invocations, which is usually cheaper.

Deploying the API on Leapcell is very easy. You don't have to set up any environment. Just deploy a Python image, and fill in the "Build Command" field properly.

Will Your Marketing Email End Up in Spam? We Built a Tool to Find Out

Once deployed, you’ll have an api for spam validating. Whenever the API is invoked, it will run SpamAssassin, score the email, and return the score.

Will Your Marketing Email End Up in Spam? We Built a Tool to Find Out

Read on our blog

The above is the detailed content of Will Your Marketing Email End Up in Spam? We Built a Tool to Find Out. For more information, please follow other related articles on the PHP Chinese website!

source:dev.to
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Articles by Author
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template