Im Web fehlen häufig Angaben darüber, in welcher Sprache eine Seite geschrieben ist. Trotzdem würden wir gerne den Benutzern unseres Dienstes nur diejenigen Inhalte anzeigen, die sie auch verstehen. Auch in anderen Bereichen erhalten wir oft Texte, deren Sprachen wir nicht kennen.

Daher wollen wir ein einfaches System zur Klassifikation einer Sprache in Python entwickeln. Hierzu versuchen wir, ein n-gram-Modell auf Zeichenebene zu verwenden.

Trainingsdaten

Zum Trainieren des Modells verwende ich die Google-1-gram-Daten. Besser wären zwar die Google-2gram-Date, da hier Berechnungen über Wortgrenzen möglich sind, doch der 2gram-Datensatz ist erheblich größer.

Die einzelnen Dateien speichere ich nach Sprachen getrennt in folgender Ordnerstruktur:

languages/
    english/
    french/
    german/
    ...

Hat man die kompletten 1gram-Daten mehrerer Sprachen heruntergeladen, muss man sie noch entpacken. Dies erledigt man am besten automatisiert mit einem Shellskript:

for file in *.gz; do
    gunzip -k $file
done

Modellbildung

Zunächst benötigen wir eine Funktion, die uns die Buchstaben-n-Gramme aus den Google-Daten extrahiert. Lasst euch im Folgenden nicht von den Google-1grams (bezogen auf Wortanzahl) und den Buchstaben-ngrams (bezogen auf Zeichenanzahl) verwirren.

Mit folgender Funktion können wir aus einer Zeichenkette alle Buchstaben-ngrams extrahieren:

def ngrams(text, n=2):
    for i in range(len(text)-1):
        yield (text[i], text[i+1])

Die Vorkommen jedes n-Grams zählen wir mithilfe eines Dictionaries. In einem ersten Schritt berechnen wir dazu für jede Datei die n-Gram-Verteilung. Damit dies schneller geht, setzen wir Parallelisierung über concurrent.futures.ThreadPoolExecutor ein. Die Ausgabe wird in gleiche Dateien in einem Ordner model geschrieben.

import glob
import os.path
import pickle
import concurrent.futures

from ngrams import ngrams


def create_model_file(filepath):
    model = {}

    language = os.path.basename(os.path.dirname(filepath))
    filename = os.path.basename(filepath)

    print(filepath)

    with open(filepath) as f, open(os.path.join('model/%s/' % language, os.path.basename(filepath)), 'wb') as f_out:
        for line in f:
            try: # some lines contain only three columns, maybe "word" = empty string
                word, year, count, _ = line.strip().split('\t')
            except ValueError:
                continue

            # only consider new words and words that occur often enough to seem real
            if int(count) < 100 or int(year) < 2000:
                continue
            
            for ngram in ngrams(word):
                model.setdefault(ngram, 0)
                model[ngram] += int(count)

        pickle.dump(model, f_out)
        os.unlink(filepath)


futures = []
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
for filepath in glob.glob('data/*/*'):
    if filepath.endswith('.gz'): # ignore the gz files we kept
        continue

    f = executor.submit(create_model_file, filepath)
    futures.append(f)

# we use this for a quick check if all is ok or if exceptions were thrown
for x in concurrent.futures.as_completed(futures):
    print(x)

Haben wir diese Teilmodelle pro Datei ausgelesen, müssen wir noch alle Dateien einer Sprache zu einem kompletten Sprachmodell kombinieren. Hierzu schreiben wir eine weitere Datei, die die Teilmodelle alle einliest und gemäß der Gesamtanzahl der enthaltenen n-Grams anteilig auf das Gesamtmodell aufaddiert. Das Ergebnis wird in einen Ordner finished_models geschrieben.

import glob
import pickle
import os.path

model_parts = {}
for filepath in glob.glob('model/*/*'):
    language = os.path.basename(os.path.dirname(filepath))

    model_parts.setdefault(language, [])

    with open(filepath, 'rb') as f:
        model_part = pickle.load(f)
        sum_all = sum(model_part.values())

        model_parts[language].append({
            'sum': sum_all,
            'percentages': dict([(kv[0], kv[1]/sum_all) for kv in model_part.items()])
        })

models = {}
for language, language_parts in model_parts.items():
    with open('finished_models/%s' % language, 'wb') as f:
        overall_sum = sum(map(lambda x: x['sum'], language_parts))
        
        models.setdefault(language, {})
        
        for language_part in language_parts:
            percentages = language_part['percentages']
            for ngram, percentage in percentages.items():
                models[language].setdefault(ngram, 0)
                models[language][ngram] += percentage * language_part['sum'] / overall_sum

        pickle.dump(models[language], f)

Detektion

Hat man alle Modelle generiert, kann man durch einen Vergleich der n-Gram-Verteilung des Referenztexts mit allen Modellen berechnen, welche sich am ähnlichsten sind. Ich habe hierzu den Mittleren-Quadratischen-Fehler verwendet, es gäbe jedoch auch statistische Tests für Wahrscheinlichkeitsverteilungen. Jedoch schien mir der Chi-Quadrat-Test, der für diskrete Verteilungen empfohlen wurde, einen Fehler von Unendlich zu ergeben, wenn im erwarteten Wertebereich (also in unserem Sprachmodell) ein Verteilungswert 0 ist. Wahrscheinlich könnte man diesen Fall vernachlässigen, da unsere Modelle auf sehr vielen Daten trainiert wurden, jedoch schien mir diese Situation dennoch seltsam.

Der Referenztext wird dann über die stdin eingelesen.

import os, os.path
import sys
import pickle

from ngrams import ngrams


def mean_squared_error(testset, model):
    error = 0

    indices = set(testset.keys()).union(set(model.keys()))
    for index in indices:
        if index not in testset:
            error += model[index]
        elif index not in model:
            error += testset[index]
        else:
            error += (testset[index] - model[index])**2

    return error


models = {}

folder = 'finished_models'

for language in os.listdir(folder):
    filepath = os.path.join(folder, language)

    with open(filepath, 'rb') as f:
        models[language] = pickle.load(f)

words = ' '.join(map(str.strip, sys.stdin)).split()

testset = {}
for word in words:
    for ngram in ngrams(word):
        testset.setdefault(ngram, 0)
        testset[ngram] += 1

all_sum = sum(testset.values())
testset = dict([(kv[0], kv[1] / all_sum) for kv in testset.items()])

errors = {}
for language in models:
    errors[language] = mean_squared_error(testset, models[language])
print(errors)

In Dictionary errors sind dann die Fehler für alle Sprachen gespeichert. Die Sprache mit minimalem Fehler ist die vermutete Sprache des Referenztexts.

I do not maintain a comments section. If you have any questions or comments regarding my posts, please do not hesitate to send me an e-mail to blog@stefan-koch.name.