Python unterstützt die Verwendung mehrerer CPU-Kerne durch das Modul multiprocessing. Grundsätzlich ist die Verwendung von multiprocessing in der Semantik ähnlich wie multithreading.

Anstatt threading.Thread zu verwenden, schreibt man dan eine Unterklasse von multiprocessing.Process um eigene Prozesse zu erstellen. Diese werden dann vom Python-Interpreter erzeugt und vom Betriebssystem ggfs. auf mehrere CPU-Kerne verteilt.

Wir wollen im folgenden an einem tatsächlich sinnvollen Beispiel durchgehen, wie man das multiprocessing Modul von Python verwenden kann. Dazu schauen wir uns die Potenziereung von Jordan-Matrizen an.

Jordan-Matrizen sind solche Matrizen, die lediglich auf der Diagonalen Einträge (die Eigenwerte) haben und auf der Nebendiagonalen Einsen oder Nullen. Alle anderen Einträge sind Null. Da in Jordan-Matrizen jedes Jordan-Kästchen unabhängig von allen anderen Jordan-Kästchen multipliziert wird (da in denselben Spalten darunter/darüber oder in denselben Spalten daneben nur Null-Einträge stehen), können wir die Matrix in viele Teilmatrizen aufspalten und diese unabhängig voneinander potenzieren. Dies führt dazu, dass wir eine zerteilte 1000x1000-Matrix in etwa einer Sekunde berechnen können, während die Standard-Berechnung mehrere Minuten dauert.

Die Parallelität auf mehreren Prozessen kommt dann ins Spiel, wenn die einzelnen Teilmatrizen selbst auch wieder relativ groß sind. Nehmen wir an, wir haben eine 3.000x3.000 Matrix und es gibt hier Teilmatrizen, die selbst wiederum 1000x1000 Felder groß sind. Dann dauert die Berechnung der Potenz jeder einzelnen Teilmatrix natürlich wieder mehrere Minuten und man kann die Parallelität auf mehreren Kernen ausnutzen, um gleichzeitig 3 Berechnungen von 1000x1000 Matrizen durchzuführen.

Schauen wir uns zunächst den generellen Ablauf des Algorithmus an:

  • Zunächst müssen wir die große Matrix in kleinere Matrizen aufsplitten
  • Anschließend wenden wir die Berechnung in verschiedenen Prozessen auf jede Teilmatrix einzeln an
  • Danach fügen wir die Ergebnisse wieder zu einer Matrix zusammen

Da bei paralleler Verarbeitung nicht garantiert ist, in welcher Reihenfolge die Prozesse abgearbeitet werden, müssen wir uns immer speichern, an welcher Position eine Teilmatrix stand. Nur so können wir das Ergebnis der Teil-Multiplikation wieder an richtiger Stelle in die große Matrix einfügen.

Vorarbeit

Zunächst definieren wir zwei Hilfsfunktionen, die es uns erlauben, einfach Matrizen mit lauter Nullen zu erstellen oder Matrizen mit Nullen zu erweitern.

def zero_matrix(width, height):
    return [[0 for j in range(width)] for i in range(height)]

def zerofill_matrix(matrix, width, height):
    matrix_width = 0
    if len(matrix) != 0:
        matrix_width = len(matrix[0])
    
    for i in range(height):
        matrix.append([0 for i in range(matrix_width)])
    
    for i in range(len(matrix)):
        for j in range(width):
            matrix[i].append(0)
    return matrix

Zerteilen der Matrix

Zuerst müssen wir die Matrix in ihre einzelnen Bestandteile, das heißt die Jordan-Kästchen aufteilen. Dazu prüfen wir, wo in der Matrix auf der Nebendiagonalen (in unserem Fall der oberen Nebendiagonalen) Nullen stehen. An den Stellen, an denen Nullen auftreten, dürfen wir die Matrix in Untermatrizen zerteilen. split_matrix() prüft, wo diese Nullen stehen, und extract_matrix() erstellt dann in gegebenen Grenzen eine Kopie der Teilmatrix.

def extract_matrix(matrix, fro, to):
    new_matrix = zero_matrix(to-fro, to-fro)
    for i in range(len(new_matrix)):
        for j in range(len(new_matrix[0])):
            new_matrix[i][j] = matrix[fro+i][fro+j]
    return new_matrix
  
def split_matrix(matrix):
    sub_matrices = []
    
    last_split = 0
    for i in range(len(matrix)):
        if i == len(matrix)-1:
            sub_matrices.append(extract_matrix(matrix, last_split, len(matrix)))
        elif matrix[i][i+1] == 0:
            sub_matrices.append(extract_matrix(matrix, last_split, i+1))
            last_split = i+1
    return sub_matrices

Berechnung der Matrix-Potenz in Prozessen

Nachdem wir die Matrix nun in kleine Teilmatrizen unterteilt haben, können wir uns endlich der Hauptaufgabe widmen: Die Berechnung der Potenz durch Verteilung der Aufgaben an verschiedene Prozesse (ggfs. auf verschiedenen CPUs). Dabei greifen wir auf einen Artikel von O’Reilly zur Verwendung von multiprocessing zurück.

Zunächst erzeugen wir eine Unterklasse von multiprocessing.Process, um neue Prozesse erstellen zu können. Wir können eine Queue verwenden, um die Daten zwischen verschiedenen Prozesen auszutauschen. Dabei erstellen wir gleich zwei Queues: eine für die Aufträge und eine für die Ergebnisse. Außerdem speichern wir zu jedem Eintrag in einer Queue auch die Reihenfolge, damit wir die Matrix später wieder richtig zusammensetzen können.

class Matrixmultiplier(multiprocessing.Process):
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means we should exit
                print('%s: Exiting' % proc_name)
                break
            answer = next_task.calculate()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, matrix):
        self.matrix = matrix
    
    def calculate(self):
        return matrix_multiply(self.matrix, self.matrix)

An dieser Stelle fügen wir auch gleich einen einfachen Algorithmus zur Matrix-Multiplikation hinzu:

def matrix_multiply(matrix1, matrix2):
    new_matrix = zero_matrix(len(matrix1), len(matrix2[0]))
    
    for i in range(len(matrix1)):
        for j in range(len(matrix2[0])):
            for k in range(len(matrix2)):
                new_matrix[i][j] += matrix1[i][k]*matrix2[k][j]
  
    return new_matrix

Und jetzt kommen wir zur Hauptanwendung. Wir erstellen uns die beiden genannten Queues und Prozesse in der Anzahl unserer Prozessoren. Dann speichern wir alle Teilmatrizen als Aufgaben in der Queue und fügen für jeden Prozess noch eine None-_Aufgabe_ als Markierung für das Ende hinzu. Anschließend warten wir auf die Ergebnisse. Sobald alle Ergebnisse da sind, sortieren wir sie und übergeben die Teilmatrizen an eine Funktion zur Vereinigung von Matrizen, die im nächsten Abschnitt dargestellt wird.

if __name__ == '__main__':
    matrix = create_matrix(3, 2)
    submatrices = split_matrix(matrix)
    
    # Establish communication queues
    tasks = multiprocessing.Queue()
    results = multiprocessing.Queue()
    
    # Start processes
    num_processes = multiprocessing.cpu_count()
    processes = [Matrixmultiplier(tasks, results) for i in range(num_processes)]
    for w in processes:
        w.start()
    
    # Enqueue jobs
    counter = 0
    for submatrix in submatrices:
        tasks.put((counter, Task(submatrix)))
        counter += 1
    
    # Add a poison pill for each consumer
    for i in range(num_processes):
        tasks.put((None, None))
    
    # Start printing results
    num_jobs = len(submatrices)
    resultlist = []
    while num_jobs:
        result = results.get() # all partial results
        resultlist.append(result)
        num_jobs -= 1
    
    # Each result has its position at [0] of the tuple and the result at [1]
    sorted(resultlist)
    
    print("A = ")
    print(matrix)
    print("A^2 = ")
    print(union_matrix([result[1] for result in resultlist]))

Außerdem müssen wir natürlich noch eine Funktion zur Generierung von großen Jordan-Matrizen implementieren. Diese nimmt Parameter für die Gesamtbreite, für die maximale Breite eines Jordan-Kästchens und für die maximale Größe (positiv und negativ) eines Eigenwerts entgegen.

def create_matrix(size = 1000, max_jordan_size = 100, max_eigenvalue = 1000):
    cur = 0
    matrix = zero_matrix(size, size)
    while cur < size:
        # length of the first jordan part
        length = random.randint(1, max_jordan_size)
        eigenvalue = random.randint(-max_eigenvalue, max_eigenvalue)
        
        for i in range(length):
            if cur + i < size:
                matrix[cur + i][cur + i] = eigenvalue
        for i in range(length-1):
            if cur + i + 1 < size:
                matrix[cur + i][cur + i + 1] = 1
        
        cur += length
    
    return matrix

Vereinigen der gegeben Teilmatrizen

Nachdem wir die Berechnung durchgeführt haben, müssen wir die erhaltenen Teilmatrizen noch zu einer großen vereinigen. Benötigen wir eine Funktion, die eine Matrix der korrekten Größe erstellt und die Ergebnisse in die große Matrix kopiert. Anstatt direkt eine Matrix mit der kompletten Größe zu erzeugen, vergrößern wir die existierende Matrix in jedem Schritt um die benötigte Anzahl an Zeilen und Spalten.

def union_matrix(matrices):
    full_matrix = []
    offset = 0
    for matrix in matrices:
        zerofill_matrix(full_matrix, len(matrix), len(matrix))
        for i in range(len(matrix)):
            for j in range(len(matrix)):
                full_matrix[offset+i][offset+j] = matrix[i][j]
            
        # Set new offset
        offset = len(matrix)
    
    return full_matrix

Download

Der gesamte Code zum Download: jordan.py

I do not maintain a comments section. If you have any questions or comments regarding my posts, please do not hesitate to send me an e-mail to blog@stefan-koch.name.