본문 바로가기
## 오래된 게시글 (미관리) ##/Python (Linux)

25. Python - 분류 연습문제 4 [ 스팸 필터 / 스팸 분류기 ]

by #Glacier 2018. 12. 5.

안녕하세요. 오늘은 분류의 마지막 연습문제인 스팸 분류기를 만들어봅니다.


먼저, 데이터를 다운로드 하는데, 저자가 제공한 코드 그대로 쓰면 ReadError : file could not opened succesfully 오류가 납니다.

(저만 그런가요. ham은 되는데, spam은 설치도 열리지도 않는걸로 보면, 뭔가 문제가 있습니다.)


그래서 저는 수동으로 설치하고 손으로 풀었어요. 혹시 몰라 코드 써놓을 테니 되시는 분은 그냥 쓰셔도 무방합니다.


# 데이터 다운로드

import os

import tarfile

from six.moves import urllib


DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"

HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"

SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"

SPAM_PATH = os.path.join("/root","datasets", "spam")


def fetch_spam_data(spam_url=SPAM_URL, spam_path=SPAM_PATH):

    if not os.path.isdir(spam_path):

        os.makedirs(spam_path)

    for filename, url in (("ham.tar.bz2", HAM_URL), ("spam.tar.bz2", SPAM_URL)) :

        path = os.path.join(spam_path, filename)

        if not os.path.isfile(path):

            urllib.request.urlretrieve(url, path)

        print(url, path)

        tar_bz2_file = tarfile.open(path)

        tar_bz2_file.extractall(path=SPAM_PATH)

        tar_bz2_file.close()


#열기

fetch_spam_data()



기존의 경로가 SPAM_PATH인데 이 것은 지정폴더 까지이므로, 압축이 풀린 후의 easy_ham, spam 폴더로 경로 지정합니다.

HAM_DIR = os.path.join(SPAM_PATH, "easy_ham")

SPAM_DIR = os.path.join(SPAM_PATH, "spam")


폴더 안에 있는 파일들의 이름이 20글자가 넘어가면 For문을 통해 주루르륵 들어갑니다.


ham_filenames = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20 ]

spam_filenames = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20 ]


import email

import email.policy 


is_spam은 스팸인가 아닌가에 대한 구분자/  filename for문으로 아까 경로지정해서 받아온 파일명들이 들어갑니다.

directory는 is_spam이 TRUE이면 spam, FALSE이면 easy_ham으로 지정합니다.

"rb"는 read, binary 즉 binary mode로 읽는다는 말 입니다. 그리고 이를 f로 지정했구요.


def load_email(is_spam, filename, spam_path = SPAM_PATH) :

   directory = "spam" if is_spam else "easy_ham"

   with open(os.path.join(spam_path, directory, filename), "rb") as f :

      return email.parser.BytesParser(policy = email.policy.default).parse(f)


ham_emails = [load_email(is_spam = False, filename = name) for name in ham_filenames]

spam_emails = [load_email(is_spam = True, filename= name) for name in spam_filenames]


print(ham_emails[1].get_content().strip() 을 통해서 데이터의 내용을 살펴봅니다.



print(spam_emails[6].get_content().strip()) 으로 스팸 메일도 봅니다.



어떤 이메일은 이미지나 첨부 파일을 가지는 multipart 입니다.

메일에 포함되어 있을 수 있으며 어떤 파일들이 있는지 살펴봅니다.

================================

def get_email_structure(email) :

    if isinstance(email, str) :

        return email

    payload = email.get_payload()

    if isinstance(payload, list):

        return "multipart({})".format(", ".join([

            get_email_structure(sub_email)

            for sub_email in payload

        ]))

    else :

        return email.get_content_type()

================================

from collections import Counter


def structures_counter(emails):

    structures = Counter()

    for email in emails:

        structure = get_email_structure(email)

        structures[structure] += 1

    return structures

================================


structures_counter(ham_emails).most_common()


================================


structures_countere(spam_emails).most_common()

햄 메일은 plain 텍스트가 거의 대부분이며, 스팸은 HTML일 경우가 많다. 

그리고, 66개의 메일이 pgp로 서명되어 있지만, 스팸메일에는 없는 것을 알 수 있습니다.


이제 메일의 헤더를 봅니다.

for header, value in spam_emails[0].items():

    print(header, ":", value)



이렇게 헤더에도 유용한 정보가 많지만, 이번엔 Subject Header만 다뤄봅니다.


spam_emails[0]["Subject"]


데이터를 더 살펴보기 전에, 훈련 세트와 테스트 세트로 나눕니다.


import numpy as np

from sklearn.model_selection import train_test_split


X = np.array(ham_emails + spam_emails)

y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


이제, 전처리 함수를 작성합니다. HTML을 일반 텍스트로 변환하는 함수가 필요한데, BeautifulSoup 라이브러리를 사용하는 것이 좋지만 의존성을 줄이기 위해 정규식을 사용하여 대강 만듭니다.


<head> 섹션을 삭제하고 모든 <a>태그를 HYPERLINK 문자로 바꾸고, HTML 태그를 제거하고 텍스트만 남깁니다.

보기 편하게 여러 개의 개행 문자를 하나로 만들고, (&gt; 나 &nbsp; 같은 것들) html 엔티티를 복원합니다.


import re

from html import unescape


def html_to_plain_text(html):

    text =re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)

    text =re.sub('<a\s.*?>', '  HYPERLINK', text, flags=re.M | re.S |  re.I)

    text =re.sub('<.*?>', '', text, flags=re.M | re.S)

    text =re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)

    return unescape(text)


이제 잘 작동하는지 보기 위해 스팸메일을 먼저 봅니다.


html_spam_emails = [email for email in X_train[y_train==1] if get_email_structure(email) == 'text/html' ]

sample_html_spam = html_spam_emails[7]

print(sample_html_spam.get_content().strip()[:1000], "...")



변환된 것을 봅니다.

print(html_to_plain_text(sample_html_spam.get_content())[:1000], "...")


이제, 포맷에 상관없이 이메일을 입력받아 일반 텍스트를 출력하는 함수를 만듭니다.


def email_to_text(email) :

    html = None

    for part in email.walk():

        ctype = part.get_content_type()

        if not ctype in ("text/plain", "text/html"):

            continue

        try :

            content = part.get_content()

        except :

            content = str(part.get_payload())

        if ctype == 'text/plain':

            return content

        else :

            html = content

    if html :

        return html_to_plain_text(html)


print(email_to_text(sample_html_spam)[:100], "...")


잘 작동하는 것을 볼 수 있습니다. 이제, 어간 추출을 위해 자연어 처리 툴킷(NLTK)을 설치해야 합니다.

먼저 virtualenv 환경을 활성화하거나, 별도의 환경이 없다면 Admin 권한이 필요할 수 있습니다. 아니면 --user 옵션을 사용합니다.

터미널에서 먼저 pip install nltk 로 인스톨합니다.


try :

    import nltk

    

    stemmer = nltk.PorterStemmer()

    for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):

        print(word, "=>", stemmer.stem(word))

except ImportError:

    print("Error: stemming requires the NLTK module.")

    stemmer = None



인터넷 주소는 "URL" 문자로 바꾸는데 정규식을 이용해 하드코딩 할 수도 있지만, urlextract 라이브러리를 사용합니다.

pip install urlextract


try:

    import urlextract #루트 도메인 이름을 다운로드하기 위해 인터넷 연결이 필요할지 모른다.

    

    url_extractor = urlextract.URLExtract()

    print(url_extractor.find_urls("Will it detect github.com and https://youtu.be/7Pq-S557XQU?t=3m32s"))

except ImportError:

    print("Error: replacing URLs requires the urlextract module.")

    url_extractor = None



이들을 모두 하나의 변환기로 만들어서 이메일을 단어 카운트로 바꿉니다. 파이썬의 split()메서드를 사용하면 구둣점과 단어 경계를 기준으로 문장을 단어로 바꿀 수 있고, 이 방법이 많은 언어에 통하지만 전부는 아닙니다. 


예를 들어 중국어와 일본어는 일반적으로 단어 사이에 공백을 두지 않고, 베트남어는 음절 사이에 공백을 두기도 합니다. 하지만 여기에서는 영어이기 때문에 문제가 없습니다. (한\-9글은 어..떡하..지?)


from sklearn.base import BaseEstimator, TransformerMixin


class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):

    def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,

                 replace_urls=True, replace_numbers=True, stemming=True):

        self.strip_headers = strip_headers

        self.lower_case = lower_case

        self.remove_punctuation = remove_punctuation

        self.replace_urls = replace_urls

        self.replace_numbers = replace_numbers

        self.stemming = stemming

    def fit(self, X, y=None):

        return self

    def transform(self, X, y=None):

        X_transformed = []

        for email in X:

            text = email_to_text(email) or ""

            if self.lower_case:

                text = text.lower()

            if self.replace_urls and url_extractor is not None:

                urls = list(set(url_extractor.find_urls(text)))

                urls.sort(key=lambda url: len(url), reverse=True)

                for url in urls:

                    text = text.replace(url, " URL ")

            if self.replace_numbers:

                text = re.sub(r'\d+(?:\.\d*(?:[eE]\d+))?', 'NUMBER', text)

            if self.remove_punctuation:

                text = re.sub(r'\W+', ' ', text, flags=re.M)

            word_counts = Counter(text.split())

            if self.stemming and stemmer is not None:

                stemmed_word_counts = Counter()

                for word, count in word_counts.items():

                    stemmed_word = stemmer.stem(word)

                    stemmed_word_counts[stemmed_word] += count

                word_counts = stemmed_word_counts

            X_transformed.append(word_counts)

        return np.array(X_transformed)


이를 몇 가지 데이터에 적용해봅니다.


X_few = X_train[:3]

X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)

X_few_wordcounts



제대로 작동합니다. 이제 단어 카운트를 벡터로 변환하기 위해서 또 다른 변환기를 만듭니다. 이 변환기는 (자주 나타나는 단어 순으로 정렬된) 어휘 목록을 구축하는 fit()메서드와 단어를 벡터로 바꾸는 transform()메서드를 가지며, 출력은 희소행렬입니다.


from scipy.sparse import csr_matrix


class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):

    def __init__(self, vocabulary_size=1000):

        self.vocabulary_size = vocabulary_size

    def fit(self, X, y=None):

        total_count = Counter()

        for word_count in X:

            for word, count in word_count.items():

                total_count[word] += min(count, 10)

        most_common = total_count.most_common()[:self.vocabulary_size]

        self.most_common_ = most_common

        self.vocabulary_ = {word: index + 1 for index, (word, count) in enumerate(most_common)}

        return self

    def transform(self, X, y=None):

        rows = []

        cols = []

        data = []

        for row, word_count in enumerate(X):

            for word, count in word_count.items():

                rows.append(row)

                cols.append(self.vocabulary_.get(word, 0))

                data.append(count)

        return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size + 1))



이어서

vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)

X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)

X_few_vectors


X_few_vectors.toarray() 의 결과는 무엇을 의미할까요?


3행 1열의 65는 세 번째 이메일이 어휘 목록에 없는 단어를 65개 가지고 있다는 뜻입니다.

그 다음의 0은 어휘 목록에 있는 첫 번째 단어가 한 번도 등장하지 않는다는 뜻이고, 그 다음의 1은 한 번 나타난다는 뜻입니다.

이 단어들이 무엇인지 확인하려면, 어휘 목록을 보면 됩니다. 첫 번째 단어가 the, 2번째 단어가 of임을 알 수 있습니다.

따라서, 세 번째 이메일에 the가 65번이나 나왔다는 점을 알 수 있죠.


이제, 스팸 분류기를 훈련시킬 준비가 되었습니다. 전체 데이터셋을 변환합니다.

====================================================

from sklearn.pipeline import Pipeline


preprocess_pipeline = Pipeline([

    ("email_to_wordcount", EmailToWordCounterTransformer()),

    ("wordcount_to_vector", WordCounterToVectorTransformer()),

])


X_train_transformed = preprocess_pipeline.fit_transform(X_train)

====================================================

from sklearn.linear_model import LogisticRegression

from sklearn.model_selection import cross_val_score


log_clf = LogisticRegression(random_state=42)

score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3)

score.mean()

====================================================

결과는 0.985 이지만 쉬운 데이터셋임을 유념해야 합니다.


전체적인 내용을 보았으므로 Precision/Recall 출력합니다.


from sklearn.metrics import precision_score, recall_score


X_test_transformed = preprocess_pipeline.transform(X_test)


log_clf = LogisticRegression(random_state=42)

log_clf.fit(X_train_transformed, y_train)


y_pred = log_clf.predict(X_test_transformed)


print("정밀도: {:.2f}%".format(100 * precision_score(y_test, y_pred)))

print("재현율: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

====================================================

정밀도 : 93.00%

재현율 : 97.89% 


우우~ 너무나 어렵네용 ㅎㅎ

이제 다음 포스팅은 모델 훈련입니다!


 블로그 

출처


이 글의 상당 부분은  [핸즈온 머신러닝, 한빛미디어/오렐리앙 제롱/박해선] 서적을 참고하였습니다.


나머지는 부수적인 함수나 메서드에 대해 부족한 설명을 적어두었습니다.

학습용으로 포스팅 하는 것이기 때문에 복제보다는 머신러닝에 관심이 있다면 구매해보시길 추천합니다.


도움이 되셨다면 로그인 없이 가능한

아래 하트♥공감 버튼을 꾹 눌러주세요!