Threads

Content:

  1. Introduction
  2. Concept
  3. Basic threads
  4. Thread pool
  5. Advanced
  6. Synchronization
  7. Desktop GUI
  8. Server

Introduction:

Threads and multi-threaded applications are important concepts in modern software that any software developer needs to understand.

Concept

A thread is a basic execution unit in most modern operating systems.

A process is the unit running the application.

A process has one or more associated threads.

Threads share heap but have individual stacks.

An application with only one thread is called a single-threaded application. An application with more than one thread is called a multi-threaded application.

A multi-threaded application is able to do multiple things in parallel. Which can be very convenient both to make the code design cleaner and to reduce running time.

Note that it is possible and in fact very common to have more threads than can actually be executed in parallel. Let us say that we have a server with 2 CPU's that are 8 core with hyperthreading aka 2s16c32t - the server can execute 32 threads in parallel, but it is not a problem for a server application to start 500 or 1000 threads. It may often be benficial as it is common to have the majority of threads waiting for something and not be ready to execute.

25 years ago it would have been relevant to distinguish between kernel mode threads and user mode (aka green) threads. Kernel mode threads are threads actually scheduled by the operating systems. User mode (aka green) threads are an abstraction handled by some runtime sitting between the application and the operating system. Today practically all operating systems supports kernel mode threads and user mode (aka green) threads are no longer interesting. There are some problems with user mode (aka green) threads as some actions that with kernel mode threads would only block one thread can result in all threads being blocked.

Basic threads

The basic thread operations are:

And a multi-threaded application can be written using just those basic constructs.

Thread support is builtin to Java.

Java has two ways to define what will run in a thread.

One way is to create a class that extends Thread and overide the run method.

package threads;

public class Basic1 {
    public static class Data {
        private String s;
        public Data(String s) {
            this.s = s;
        }
        public String getS() {
            return s;
        }
        public void setS(String s) {
            this.s = s;
        }
    }
    public static class Processor extends Thread {
        private Data d;
        public Processor(Data d) {
            this.d = d;
        }
        @Override
        public void run() {
            try {
                // simulate a lot of work that takes 0.1 second
                String s = d.getS();
                Thread.sleep(100);
                d.setS(s + 'X');
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void test(int njobs, int nthreads) {
        // setup data
        Data[] d = new Data[njobs];
        for(int i = 0; i < njobs; i++) {
            d[i] = new Data("X");
        }
        // process
        long t1 = System.currentTimeMillis();
        int k = 0;
        for(int i = 0; i < njobs / nthreads; i++) {
            // create threads
            Thread[] t = new Thread[nthreads];
            for(int j = 0; j < nthreads; j++) {
                t[j] = new Processor(d[k]);
                k++;
            }
            // start threads
            for(int j = 0; j < nthreads; j++) {
                t[j].start();
            }
            // wait for threads to complete
            for(int j = 0; j < nthreads; j++) {
                try {
                    t[j].join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d jobs executing in %d threads : %.1f seconds\n", njobs, nthreads, (t2 - t1) / 1000.0);
        // check data
        for(int i = 0; i < njobs; i++) {
            if(!d[i].getS().equals("XX")) System.out.println("Ooops");
        }
    }
    public static void main(String[] args) {
        test(256, 1);
        test(256, 2);
        test(256, 4);
        test(256, 8);
        test(256, 16);
        test(256, 32);
        test(256, 64);
    }
}

Example output:

256 jobs executing in 1 threads : 25.7 seconds
256 jobs executing in 2 threads : 12.8 seconds
256 jobs executing in 4 threads : 6.4 seconds
256 jobs executing in 8 threads : 3.2 seconds
256 jobs executing in 16 threads : 1.6 seconds
256 jobs executing in 32 threads : 0.9 seconds
256 jobs executing in 64 threads : 0.5 seconds

Thread support is builtin to Java.

Java has two ways to define what will run in a thread.

Another way is to create a class that implements Runnable and supply a run method.

package threads;

public class Basic2 {
    public static class Data {
        private String s;
        public Data(String s) {
            this.s = s;
        }
        public String getS() {
            return s;
        }
        public void setS(String s) {
            this.s = s;
        }
    }
    public static class Processor implements Runnable {
        private Data d;
        public Processor(Data d) {
            this.d = d;
        }
        @Override
        public void run() {
            try {
                // simulate a lot of work that takes 0.1 second
                String s = d.getS();
                Thread.sleep(100);
                d.setS(s + 'X');
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void test(int njobs, int nthreads) {
        // setup data
        Data[] d = new Data[njobs];
        for(int i = 0; i < njobs; i++) {
            d[i] = new Data("X");
        }
        // process
        long t1 = System.currentTimeMillis();
        int k = 0;
        for(int i = 0; i < njobs / nthreads; i++) {
            // create threads
            Thread[] t = new Thread[nthreads];
            for(int j = 0; j < nthreads; j++) {
                t[j] = new Thread(new Processor(d[k]));
                k++;
            }
            // start threads
            for(int j = 0; j < nthreads; j++) {
                t[j].start();
            }
            // wait for threads to complete
            for(int j = 0; j < nthreads; j++) {
                try {
                    t[j].join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d jobs executing in %d threads : %.1f seconds\n", njobs, nthreads, (t2 - t1) / 1000.0);
        // check data
        for(int i = 0; i < njobs; i++) {
            if(!d[i].getS().equals("XX")) System.out.println("Ooops");
        }
    }
    public static void main(String[] args) {
        test(256, 1);
        test(256, 2);
        test(256, 4);
        test(256, 8);
        test(256, 16);
        test(256, 32);
        test(256, 64);
    }
}

Example output:

256 jobs executing in 1 threads : 25.7 seconds
256 jobs executing in 2 threads : 12.8 seconds
256 jobs executing in 4 threads : 6.5 seconds
256 jobs executing in 8 threads : 3.3 seconds
256 jobs executing in 16 threads : 1.6 seconds
256 jobs executing in 32 threads : 0.9 seconds
256 jobs executing in 64 threads : 0.4 seconds

.NET comes with a Thread class.

using System;
using System.Threading;

namespace Basic
{
    public class Program
    {
        public class Data
        {
            public string S { get; set; }
        }
        public class Processor
        {
            private Data d;
            public Processor(Data d)
            {
                this.d = d;
            }
            public void Run()
            {
                // simulate a lot of work that takes 0.1 second
                string s = d.S;
                Thread.Sleep(100);
                d.S = s + "X";
            }
        }
        public static void Test(int njobs, int nthreads)
        {
            // setup data
            Data[] d = new Data[njobs];
            for(int i = 0; i < njobs; i++)
            {
                d[i] = new Data { S = "X" };
            }
            // process
            DateTime dt1 = DateTime.Now;
            int k = 0;
            for(int i = 0; i < njobs / nthreads; i++)
            {
                // create threads
                Thread[] t = new Thread[nthreads];
                for(int j = 0; j < nthreads; j++)
                {
                    t[j] = new Thread((new Processor(d[k])).Run);
                    k++;
                }
                // start threads
                for(int j = 0; j < nthreads; j++)
                {
                    t[j].Start();
                }
                // wait for threads to complete
                for(int j = 0; j < nthreads; j++)
                {
                    t[j].Join();
                }
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds);
            // check data
            for(int i = 0; i < njobs; i++)
            {
                if(d[i].S != "XX") Console.WriteLine("Ooops");
            }
        }
        public static void Main(string[] args)
        {
            Test(256, 1);
            Test(256, 2);
            Test(256, 4);
            Test(256, 8);
            Test(256, 16);
            Test(256, 32);
            Test(256, 64);
        }
    }
}

Example output:


256 jobs executing in 1 threads : 25.8 seconds
256 jobs executing in 2 threads : 14.3 seconds
256 jobs executing in 4 threads : 7.8 seconds
256 jobs executing in 8 threads : 4.3 seconds
256 jobs executing in 16 threads : 2.2 seconds
256 jobs executing in 32 threads : 1.1 seconds
256 jobs executing in 64 threads : 0.8 seconds

.NET comes with a Thread class.

Imports System
Imports System.Threading

Namespace Basic
    Public Class Program
        Public Class Data
            Public Property S() As String
        End Class
        Public Class Processor
            Private d As Data
            Public Sub New(d As Data)
                Me.d = d
            End Sub
            Public Sub Run()
                ' simulate a lot of work that takes 0.1 second
                Dim s As String = d.S
                Thread.Sleep(100)
                d.S = s & "X"
            End Sub
        End Class
        Public Shared Sub Test(njobs As Integer, nthreads As Integer)
            ' setup data
            Dim d As Data() = New Data(njobs - 1) {}
            For i As Integer = 0 To njobs - 1
                d(i) = New Data() With { .S = "X" }
            Next
            ' process
            Dim dt1 As DateTime = DateTime.Now
            Dim k As Integer = 0
            For i As Integer = 0 To njobs \ nthreads - 1
                ' create threads
                Dim t As Thread() = New Thread(nthreads - 1) {}
                For j As Integer = 0 To nthreads - 1
                    t(j) = New Thread(AddressOf (New Processor(d(k))).Run)
                    k += 1
                Next
                ' start threads
                For j As Integer = 0 To nthreads - 1
                    t(j).Start()
                Next
                ' wait for threads to complete
                For j As Integer = 0 To nthreads - 1
                    t(j).Join()
                Next
            Next
            Dim dt2 As DateTime = DateTime.Now
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds)
            ' check data
            For i As Integer = 0 To njobs - 1
                If d(i).S <> "XX" Then
                    Console.WriteLine("Ooops")
                End If
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Test(256, 1)
            Test(256, 2)
            Test(256, 4)
            Test(256, 8)
            Test(256, 16)
            Test(256, 32)
            Test(256, 64)
        End Sub
    End Class
End Namespace

Example output:

256 jobs executing in 1 threads : 25.9 seconds
256 jobs executing in 2 threads : 13.9 seconds
256 jobs executing in 4 threads : 7.6 seconds
256 jobs executing in 8 threads : 4.0 seconds
256 jobs executing in 16 threads : 2.0 seconds
256 jobs executing in 32 threads : 1.0 seconds
256 jobs executing in 64 threads : 1.0 seconds

Windows comes with thread support in Win32 API.

There are two different ways to start a thread.

One way is the Win32 API call CreateThread.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <windows.h>

#define MAX_STR_LEN 1024

struct data {
    char s[MAX_STR_LEN];
};

DWORD WINAPI run(void *p)
{
    struct data *d;
    char s[MAX_STR_LEN];
    d = p;
    /* simulate a lot of work that takes 0.1 second */
    strcpy(s, d->s);
    Sleep(100);
    strcpy(d->s, s);
    strcat(d->s, "X");
    return 0;
}

void test(int njobs, int nthreads)
{
    struct data *d;
    HANDLE *t;
    time_t t1, t2;
    int i, j, k;
    /* setup data */
    d = malloc(njobs * sizeof(struct data));
    for(i = 0; i < njobs; i++)
    {
        strcpy(d[i].s, "X");
    }
    /* process */
    t1 = time(NULL);
    k = 0;
    for(i = 0; i < njobs / nthreads; i++)
    {
        /* create and start threads */
        t = malloc(nthreads * sizeof(HANDLE));
        for(j = 0; j < nthreads; j++)
        {
            t[j] = CreateThread(NULL, 0, run, &d[k], 0, NULL);
            k++;
        }
        /* wait for threads to complete */
        for(j = 0; j < nthreads; j++)
        {
            WaitForSingleObject(t[j], INFINITE);
            CloseHandle(t[j]);
        }
        free(t);
    }
    t2 = time(NULL);
    printf("%d jobs executing in %d threads : %d seconds\n", njobs, nthreads, (int)(t2 - t1));
    /* check data */
    for(i = 0; i < njobs; i++)
    {
        if(strcmp(d[i].s, "XX") != 0) printf("Ooops\n");
    }
    free(d);
}

int main()
{
    test(256, 1);
    test(256, 2);
    test(256, 4);
    test(256, 8);
    test(256, 16);
    test(256, 32);
    test(256, 64);
    return 0;
}

Example output:

256 jobs executing in 1 threads : 26 seconds
256 jobs executing in 2 threads : 12 seconds
256 jobs executing in 4 threads : 7 seconds
256 jobs executing in 8 threads : 3 seconds
256 jobs executing in 16 threads : 2 seconds
256 jobs executing in 32 threads : 0 seconds
256 jobs executing in 64 threads : 1 seconds

Windows comes with thread support in Win32 API.

There are two different ways to start a thread.

Another way is to use one of the MSVCRTL function _beginthread or _beginthreadex. _beginthread is not good as it does not provide a convenient way to manage the created thread. _beginthreadex does that and is actually preferred over CreateThread (for C/C++ applications), because it prepare the MSVCRTL for multi-threaded usage.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <windows.h>
#include <process.h>

#define MAX_STR_LEN 1024

struct data {
    char s[MAX_STR_LEN];
};

unsigned int __stdcall run(void *p)
{
    struct data *d;
    char s[MAX_STR_LEN];
    d = p;
    /* simulate a lot of work that takes 0.1 second */
    strcpy(s, d->s);
    Sleep(100);
    strcpy(d->s, s);
    strcat(d->s, "X");
    return 0;
}

void test(int njobs, int nthreads)
{
    struct data *d;
    HANDLE *t;
    time_t t1, t2;
    int i, j, k;
    /* setup data */
    d = malloc(njobs * sizeof(struct data));
    for(i = 0; i < njobs; i++)
    {
        strcpy(d[i].s, "X");
    }
    /* process */
    t1 = time(NULL);
    k = 0;
    for(i = 0; i < njobs / nthreads; i++)
    {
        /* create and start threads */
        t = malloc(nthreads * sizeof(HANDLE));
        for(j = 0; j < nthreads; j++)
        {
            t[j] = (HANDLE)_beginthreadex(NULL, 0, run, &d[k], 0, NULL);
            k++;
        }
        /* wait for threads to complete */
        for(j = 0; j < nthreads; j++)
        {
            WaitForSingleObject(t[j], INFINITE);
            CloseHandle(t[j]);
        }
        free(t);
    }
    t2 = time(NULL);
    printf("%d jobs executing in %d threads : %d seconds\n", njobs, nthreads, (int)(t2 - t1));
    /* check data */
    for(i = 0; i < njobs; i++)
    {
        if(strcmp(d[i].s, "XX") != 0) printf("Ooops\n");
    }
    free(d);
}

int main()
{
    test(256, 1);
    test(256, 2);
    test(256, 4);
    test(256, 8);
    test(256, 16);
    test(256, 32);
    test(256, 64);
    return 0;
}

Example output:

256 jobs executing in 1 threads : 26 seconds
256 jobs executing in 2 threads : 13 seconds
256 jobs executing in 4 threads : 6 seconds
256 jobs executing in 8 threads : 3 seconds
256 jobs executing in 16 threads : 2 seconds
256 jobs executing in 32 threads : 1 seconds
256 jobs executing in 64 threads : 0 seconds

POSIX threads are available on all POSIX compliant operating system. That means Unix (including MacOS X), Linux, IBM mainframe, OpenVMS etc.. In practice almost everything except Windows.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <unistd.h>
#include <pthread.h>

#define MAX_STR_LEN 1024

struct data {
    char s[MAX_STR_LEN];
};

void *run(void *p)
{
    struct data *d;
    char s[MAX_STR_LEN];
    d = p;
    /* simulate a lot of work that takes 0.1 second */
    strcpy(s, d->s);
    usleep(100000);
    strcpy(d->s, s);
    strcat(d->s, "X");
    return NULL;
}

void test(int njobs, int nthreads)
{
    struct data *d;
    pthread_t *t;
    time_t t1, t2;
    int i, j, k;
    void *p;
    /* setup data */
    d = malloc(njobs * sizeof(struct data));
    for(i = 0; i < njobs; i++)
    {
        strcpy(d[i].s, "X");
    }
    /* process */
    t1 = time(NULL);
    k = 0;
    for(i = 0; i < njobs / nthreads; i++)
    {
        /* create and start threads */
        t = malloc(nthreads * sizeof(pthread_t));
        for(j = 0; j < nthreads; j++)
        {
            pthread_create(&t[j], NULL, run, &d[k]);
            k++;
        }
        /* wait for threads to complete */
        for(j = 0; j < nthreads; j++)
        {
            pthread_join(t[j], &p);
        }
        free(t);
    }
    t2 = time(NULL);
    printf("%d jobs executing in %d threads : %d seconds\n", njobs, nthreads, (int)(t2 - t1));
    /* check data */
    for(i = 0; i < njobs; i++)
    {
        if(strcmp(d[i].s, "XX") != 0) printf("Ooops\n");
    }
    free(d);
}

int main()
{
    test(256, 1);
    test(256, 2);
    test(256, 4);
    test(256, 8);
    test(256, 16);
    test(256, 32);
    test(256, 64);
    return 0;
}

Example output:

256 jobs executing in 1 threads : 26 seconds
256 jobs executing in 2 threads : 12 seconds
256 jobs executing in 4 threads : 7 seconds
256 jobs executing in 8 threads : 3 seconds
256 jobs executing in 16 threads : 2 seconds
256 jobs executing in 32 threads : 0 seconds
256 jobs executing in 64 threads : 1 seconds

Delphi/Lazarus provide a wrapper API around the underlying operating systems thread API (Win32 API on Windows and Pthreads on *nix).

program Basic;

uses
  Classes, SysUtils, Windows;

type
  Data = class(TObject)
    constructor Create(s : string);
    function GetS : string;
    procedure SetS(s : string);
    property S : string read GetS write SetS;
  private
    _s : string;
  end;
  Processor = class(TThread)
    constructor Create(d : Data);
    function GetD : Data;
    property D : Data read GetD;
  protected
    procedure Execute; override;
  private
    _d : Data;
  end;

constructor Data.Create(s : string);

begin
  _s := s;
end;

function Data.GetS : string;

begin
  GetS := _s;
end;

procedure Data.SetS(s : string);

begin
  _s := s;
end;

constructor Processor.Create(d : Data);

begin
  inherited Create(true);
  _d := d;
end;

function Processor.GetD : Data;

begin
  GetD := _d;
end;

procedure Processor.Execute;

var
  s : string;

begin
  (* simulate a lot of work that takes 0.1 second *)
  s := D.S;
  Sleep(100);
  D.S := s + 'X';
end;

procedure Test(njobs, nthreads : integer);

var
  d : array of Data;
  t : array of TThread;
  t1, t2 : integer;
  dt : double;
  i, j, k : integer;

begin
  (* setup data*)
  SetLength(d, njobs);
  for i := 0 to njobs-1 do begin
    d[i] := Data.Create('X');
  end;
  (* process *)
  t1 := GetTickCount;
  k := 0;
  for i := 0 to (njobs div nthreads - 1) do begin
    SetLength(t, nthreads);
    (* create threads *)
    for j := 0 to nthreads-1 do begin
      t[j] := Processor.Create(d[k]);
      k := k + 1;
    end;
    (* start threads *)
    for j := 0 to nthreads-1 do begin
      t[j].Start;
    end;
    (* wait for threads to complete *)
    for j := 0 to nthreads-1 do begin
      t[j].WaitFor;
      t[j].Terminate;
      t[j].Free;
    end;
  end;
  t2 := GetTickCount;
  dt := (t2 - t1) / 1000;
  writeln(njobs:1,' jobs executing in ',nthreads:1,' threads : ',dt:1:1,' seconds');
  (* check data *)
  for i := 0 to njobs-1 do begin
    if d[i].S <> 'XX' then writeln('Ooops');
    d[i].Free;
  end;
end;

begin
  Test(256, 1);
  Test(256, 2);
  Test(256, 4);
  Test(256, 8);
  Test(256, 16);
  Test(256, 32);
  Test(256, 64);
end.

Example output:

256 jobs executing in 1 threads : 25.6 seconds
256 jobs executing in 2 threads : 12.8 seconds
256 jobs executing in 4 threads : 6.4 seconds
256 jobs executing in 8 threads : 3.2 seconds
256 jobs executing in 16 threads : 1.6 seconds
256 jobs executing in 32 threads : 0.8 seconds
256 jobs executing in 64 threads : 0.4 seconds

Python comes with a threading module.

The standard Python implementation CPython has a GIL (Global Interpreter Lock) that means that it can only interpret Python code in one thread at a time. Note that non-Python-interpreting activities like IO can still be done in parallel. Some other Python implementations like Jython does not have a GIL.

import threading
import time

class Data(object):
    def __init__(self, _s):
        self.s = _s

class Processor(threading.Thread):
    def __init__(self, _d):
        super(Processor, self).__init__()
        self.d = _d
    def run(self):
        # simulate a lot of work that takes 0.1 second
        s = self.d.s
        time.sleep(0.1)
        self.d.s = s + "X"

def test(njobs, nthreads):
    # setup data
    d = []
    for i in range(njobs):
        d.append(Data("X"))
    # process
    t1 = time.time()
    k = 0
    for i in range(njobs / nthreads): # // in Python 3.x
        # create threads
        t = []
        for j in range(nthreads):
            t.append(Processor(d[k]))
            k = k + 1
        # start threads
        for j in range(nthreads):
            t[j].start()
        # wait for threads to complete
        for j in range(nthreads):
            t[j].join()
    t2 = time.time()
    print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
    # check data
    for i in range(njobs):
        if d[i].s != "XX":
            print("Ooops")

test(256, 1)
test(256, 2)
test(256, 4)
test(256, 8)
test(256, 16)
test(256, 32)
test(256, 64)

Example output:

256 jobs executing in 1 threads : 28.0 seconds
256 jobs executing in 2 threads : 14.0 seconds
256 jobs executing in 4 threads : 7.0 seconds
256 jobs executing in 8 threads : 3.5 seconds
256 jobs executing in 16 threads : 1.7 seconds
256 jobs executing in 32 threads : 0.9 seconds
256 jobs executing in 64 threads : 0.4 seconds

Python comes with a multi-processing module.

Nnote that the multi-processing module does not use threads. Instead it spawns/forks copies of the Python program in separate processes. This works around the GIL limitation. But obviously the overhead of process creation is larger than thread creation (specifics are OS dependent).

import multiprocessing
import time

class Data(object):
    def __init__(self, _s):
        self.s = _s

# non-OO as that is much simpler
def run(d, q, k):
    # simulate a lot of work that takes 0.1 second
    s = d.s
    time.sleep(0.1)
    d.s = s + "X"
    # send updated data back
    q.put([k, d])

def test(njobs, nthreads):
    # setup data
    d = []
    for i in range(njobs):
        d.append(Data("X"))
    # process
    t1 = time.time()
    q = multiprocessing.Queue(njobs)
    k = 0
    for i in range(njobs / nthreads): # // in Python 3.x
        # create threads
        t = []
        for j in range(nthreads):
            t.append(multiprocessing.Process(target=run, args=(d[k],q,k,)))
            k = k + 1
        # start threads
        for j in range(nthreads):
            t[j].start()
        # wait for threads to complete
        for j in range(nthreads):
            t[j].join()
    t2 = time.time()
    print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
    # get all updated data
    while not q.empty():
        res = q.get()
        d[res[0]] = res[1]
    # check data
    for i in range(njobs):
        if d[i].s != "XX":
            print("Ooops" + d[i].s)

if __name__ == '__main__': # necesaary as copies are forked
    test(128, 1)
    test(128, 2)
    test(128, 4)
    test(128, 8)
    test(128, 16)
    test(128, 32)
    test(128, 64)

Example output:

128 jobs executing in 1 threads : 26.4 seconds
128 jobs executing in 2 threads : 14.0 seconds
128 jobs executing in 4 threads : 7.6 seconds
128 jobs executing in 8 threads : 5.4 seconds
128 jobs executing in 16 threads : 4.8 seconds
128 jobs executing in 32 threads : 4.6 seconds
128 jobs executing in 64 threads : 4.5 seconds

Note that the results are not totally deterministic - the exact times will vary between runs.

Also note that the reason why it scales so well is that the task is not CPU intensive. If the task was CPU intensive then it would only scale to about 4 threads because the system used is a single CPU with 4 cores system.

Thread pool

Some frameworks has introduced thread pools to make it easier to create multi-threaded applications.

Instead of the application explicit managing threads then the application just submits jobs to the thread pool and the thread pool run jobs on a pool of threads.

Java 1.5 (2004) added a bunch of multi threading support classes including a thread pool service to Java.

package threads;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Pool {
    public static class Data {
        private String s;
        public Data(String s) {
            this.s = s;
        }
        public String getS() {
            return s;
        }
        public void setS(String s) {
            this.s = s;
        }
    }
    public static class Processor implements Callable<Void> {
        private Data d;
        public Processor(Data d) {
            this.d = d;
        }
        @Override
        public Void call() {
            try {
                // simulate a lot of work that takes 0.1 second
                String s = d.getS();
                Thread.sleep(100);
                d.setS(s + 'X');
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    public static void test(int njobs, int nthreads) {
        // setup data
        Data[] d = new Data[njobs];
        for(int i = 0; i < njobs; i++) {
            d[i] = new Data("X");
        }
        long t1 = System.currentTimeMillis();
        ExecutorService es = Executors.newFixedThreadPool(nthreads);
        // submit jobs
        @SuppressWarnings("unchecked")
        Future<Void>[] res = new Future[njobs];
        for(int i = 0; i < njobs; i++) {
            res[i] = es.submit(new Processor(d[i]));
        }
        // wait for jobs to complete
        for(int i = 0; i < njobs; i++) {
            try {
                res[i].get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        es.shutdown();
        long t2 = System.currentTimeMillis();
        System.out.printf("%d jobs executing in %d threads : %.1f seconds\n", njobs, nthreads, (t2 - t1) / 1000.0);
        // check data
        for(int i = 0; i < njobs; i++) {
            if(!d[i].getS().equals("XX")) System.out.println("Ooops");
        }
    }
    public static void main(String[] args) {
        test(256, 1);
        test(256, 2);
        test(256, 4);
        test(256, 8);
        test(256, 16);
        test(256, 32);
        test(256, 64);
    }
}

Example output:

256 jobs executing in 1 threads : 25.6 seconds
256 jobs executing in 2 threads : 12.8 seconds
256 jobs executing in 4 threads : 6.4 seconds
256 jobs executing in 8 threads : 3.2 seconds
256 jobs executing in 16 threads : 1.6 seconds
256 jobs executing in 32 threads : 0.8 seconds
256 jobs executing in 64 threads : 0.4 seconds

.NET always had a thread pool class.

using System;
using System.Threading;

namespace PoolOld
{
    public class Program
    {
        public class Counter
        {
            public int N { get; set; }
        }
        public class Data
        {
            public string S { get; set; }
        }
        public class Processor
        {
            private Data d;
            public Processor(Data d)
            {
                this.d = d;
            }
            public void Run(Object c)
            {
                // simulate a lot of work that takes 0.1 second
                string s = d.S;
                Thread.Sleep(100);
                d.S = s + "X";
                // count down
                lock(c)
                {
                    ((Counter)c).N--;
                }
            }
        }
        public static void Test(int njobs, int nthreads)
        {
            // setup data
            Data[] d = new Data[njobs];
            for(int i = 0; i < njobs; i++)
            {
                d[i] = new Data { S = "X" };
            }
            // process
            DateTime dt1 = DateTime.Now;
            ThreadPool.SetMinThreads(nthreads, nthreads);
            ThreadPool.SetMaxThreads(nthreads, nthreads);
            // submit jobs
            Counter c = new Counter { N = njobs };
            for(int i = 0; i < njobs; i++)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback((new Processor(d[i])).Run), c);
            }
            // wait for jobs to complete
            while(true)
            {
                lock(c)
                {
                    if(c.N <= 0) break;
                }
                Thread.Sleep(25);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds);
            // check data
            for(int i = 0; i < njobs; i++)
            {
                if(d[i].S != "XX") Console.WriteLine("Ooops");
            }
        }
        public static void Main(string[] args)
        {
            Test(256, 1);
            Test(256, 2);
            Test(256, 4);
            Test(256, 8);
            Test(256, 16);
            Test(256, 32);
            Test(256, 64);
        }
    }
}

Example output:

256 jobs executing in 1 threads : 25.6 seconds
256 jobs executing in 2 threads : 15.3 seconds
256 jobs executing in 4 threads : 7.6 seconds
256 jobs executing in 8 threads : 3.9 seconds
256 jobs executing in 16 threads : 2.8 seconds
256 jobs executing in 32 threads : 2.2 seconds
256 jobs executing in 64 threads : 2.0 seconds

.NET always had a thread pool class.

Imports System
Imports System.Threading

Namespace PoolOld
    Public Class Program
        Public Class Counter
            Public Property N() As Integer
        End Class
        Public Class Data
            Public Property S() As String
        End Class
        Public Class Processor
            Private d As Data
            Public Sub New(d As Data)
                Me.d = d
            End Sub
            Public Sub Run(c As Object)
                ' simulate a lot of work that takes 0.1 second
                Dim s As String = d.S
                Thread.Sleep(100)
                d.S = s & "X"
                ' count down
                SyncLock c
                    DirectCast(c, Counter).N -= 1
                End SyncLock
            End Sub
        End Class
        Public Shared Sub Test(njobs As Integer, nthreads As Integer)
            ' setup data
            Dim d As Data() = New Data(njobs - 1) {}
            For i As Integer = 0 To njobs - 1
                d(i) = New Data() With { .S = "X" }
            Next
            ' process
            Dim dt1 As DateTime = DateTime.Now
            ThreadPool.SetMinThreads(nthreads, nthreads)
            ThreadPool.SetMaxThreads(nthreads, nthreads)
            ' submit jobs
            Dim c As New Counter() With { .N = njobs }
            For i As Integer = 0 To njobs - 1
                ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf (New Processor(d(i))).Run), c)
            Next
            ' wait for jobs to complete
            While True
                SyncLock c
                    If c.N <= 0 Then
                        Exit While
                    End If
                End SyncLock
                Thread.Sleep(25)
            End While
            Dim dt2 As DateTime = DateTime.Now
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds)
            ' check data
            For i As Integer = 0 To njobs - 1
                If d(i).S <> "XX" Then
                    Console.WriteLine("Ooops")
                End If
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Test(256, 1)
            Test(256, 2)
            Test(256, 4)
            Test(256, 8)
            Test(256, 16)
            Test(256, 32)
            Test(256, 64)
        End Sub
    End Class
End Namespace

Example output:

256 jobs executing in 1 threads : 25.6 seconds
256 jobs executing in 2 threads : 15.2 seconds
256 jobs executing in 4 threads : 7.7 seconds
256 jobs executing in 8 threads : 4.1 seconds
256 jobs executing in 16 threads : 2.8 seconds
256 jobs executing in 32 threads : 2.2 seconds
256 jobs executing in 64 threads : 1.8 seconds

.NET 4.0 (2010) added Task Parallel Library (TPL) including a new task scheduler to .NET.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace PoolNew
{
    public class Program
    {
        public class Data
        {
            public string S { get; set; }
        }
        public class Processor
        {
            private Data d;
            public Processor(Data d)
            {
                this.d = d;
            }
            public void Run()
            {
                // simulate a lot of work that takes 0.1 second
                string s = d.S;
                Thread.Sleep(100);
                d.S = s + "X";
            }
        }
        public static void Test(int njobs)
        {
            // setup data
            Data[] d = new Data[njobs];
            for(int i = 0; i < njobs; i++)
            {
                d[i] = new Data { S = "X" };
            }
            DateTime dt1 = DateTime.Now;
            // submit jobs
            Task[] t = new Task[njobs];
            for(int i = 0; i < njobs; i++)
            {
                t[i] = Task.Factory.StartNew((new Processor(d[i])).Run);
            }
            // wait for jobs to complete
            for(int i = 0; i < njobs; i++)
            {
                t[i].Wait();
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, "many", (dt2 - dt1).TotalSeconds);
            // check data
            for(int i = 0; i < njobs; i++)
            {
                if(d[i].S != "XX") Console.WriteLine("Ooops");
            }
        }
        public static void Main(string[] args)
        {
            Test(256);
        }
    }
}

Note that it is non-trivial to control number of threads.

Example output:

256 jobs executing in many threads : 3.1 seconds

.NET 4.0 (2010) added Task Parallel Library (TPL) including a new task scheduler to .NET.

Imports System
Imports System.Threading
Imports System.Threading.Tasks

Namespace PoolNew
    Public Class Program
        Public Class Data
            Public Property S() As String
        End Class
        Public Class Processor
            Private d As Data
            Public Sub New(d As Data)
                Me.d = d
            End Sub
            Public Sub Run()
                ' simulate a lot of work that takes 0.1 second
                Dim s As String = d.S
                Thread.Sleep(100)
                d.S = s & "X"
            End Sub
        End Class
        Public Shared Sub Test(njobs As Integer)
            ' setup data
            Dim d As Data() = New Data(njobs - 1) {}
            For i As Integer = 0 To njobs - 1
                d(i) = New Data() With { .S = "X" }
            Next
            Dim dt1 As DateTime = DateTime.Now
            ' submit jobs
            Dim t As Task() = New Task(njobs - 1) {}
            For i As Integer = 0 To njobs - 1
                t(i) = Task.Factory.StartNew(AddressOf (New Processor(d(i))).Run)
            Next
            ' wait for jobs to complete
            For i As Integer = 0 To njobs - 1
                t(i).Wait()
            Next
            Dim dt2 As DateTime = DateTime.Now
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, "many", (dt2 - dt1).TotalSeconds)
            ' check data
            For i As Integer = 0 To njobs - 1
                If d(i).S <> "XX" Then
                    Console.WriteLine("Ooops")
                End If
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Test(256)
        End Sub
    End Class
End Namespace

Note that it is non-trivial to control number of threads.

Example output:

256 jobs executing in many threads : 3.1 seconds

Python 3.2 (2011) introduced a thread pool that are very heavily inspired by the Java thread pool.

# requires Python >= 3.2
import concurrent.futures
import time

class Data(object):
    def __init__(self, _s):
        self.s = _s

# non-OO as that is much simpler
def run(arg):
    d = arg[0]
    # simulate a lot of work that takes 0.1 second
    s = d.s
    time.sleep(0.1)
    d.s = s + "X"
    return d

def test(njobs, nthreads):
    # setup data
    d = []
    for i in range(njobs):
        d.append(Data("X"))
    # process
    t1 = time.time()
    executor = concurrent.futures.ThreadPoolExecutor(nthreads)
    fut = []
    for i in range(njobs):
        fut.append(executor.submit(run, [d[i]]))
    for i in range(njobs):
        d[i] = fut[i].result()
    t2 = time.time()
    print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
    # check data
    for i in range(njobs):
        if d[i].s != "XX":
            print("Ooops" + d[i].s)

if __name__ == '__main__': # necesaary as copies are forked
    test(256, 1)
    test(256, 2)
    test(256, 4)
    test(256, 8)
    test(256, 16)
    test(256, 32)
    test(256, 64)

Example output:

256 jobs executing in 1 threads : 28.0 seconds
256 jobs executing in 2 threads : 14.0 seconds
256 jobs executing in 4 threads : 7.0 seconds
256 jobs executing in 8 threads : 3.5 seconds
256 jobs executing in 16 threads : 1.7 seconds
256 jobs executing in 32 threads : 0.9 seconds
256 jobs executing in 64 threads : 0.4 seconds

The multi-processing module also have a thread/process pool.

The interface exposed by the pool is actually much nicer than the basic, so one should always use the pool.

import multiprocessing
import time

class Data(object):
    def __init__(self, _s):
        self.s = _s

# non-OO as that is much simpler
def run(d):
    # simulate a lot of work that takes 0.1 second
    s = d.s
    time.sleep(0.1)
    d.s = s + "X"
    return d

def test(njobs, nthreads):
    # setup data
    d = []
    for i in range(njobs):
        d.append(Data("X"))
    # process
    t1 = time.time()
    pool = multiprocessing.Pool(nthreads)
    res = []
    for i in range(njobs):
        res.append(pool.apply_async(run, [d[i]]))
    for i in range(njobs):
        d[i] = res[i].get()
    t2 = time.time()
    print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
    # check data
    for i in range(njobs):
        if d[i].s != "XX":
            print("Ooops" + d[i].s)

if __name__ == '__main__': # necesaary as copies are forked
    test(128, 1)
    test(128, 2)
    test(128, 4)
    test(128, 8)
    test(128, 16)
    test(128, 32)
    test(128, 64)

Example output:

128 jobs executing in 1 threads : 14.1 seconds
128 jobs executing in 2 threads : 7.1 seconds
128 jobs executing in 4 threads : 3.6 seconds
128 jobs executing in 8 threads : 2.0 seconds
128 jobs executing in 16 threads : 1.2 seconds
128 jobs executing in 32 threads : 1.1 seconds
128 jobs executing in 64 threads : 1.8 seconds

Advanced

The addition of FP (Functional Programming) to some languages has made it possible to do multi-threading in a much simpler way.

Java 8 (2014) added streams and lambdas to Java.

package threads;

import java.util.Arrays;

public class Adv {
    public static class Data {
        private String s;
        public Data(String s) {
            this.s = s;
        }
        public String getS() {
            return s;
        }
        public void setS(String s) {
            this.s = s;
        }
    }
    public static class Processor {
        private Data d;
        public Processor(Data d) {
            this.d = d;
        }
        public void run() {
            try {
                // simulate a lot of work that takes 0.1 second
                String s = d.getS();
                Thread.sleep(100);
                d.setS(s + 'X');
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void test(boolean par, int njobs) {
        // setup data
        Data[] d = new Data[njobs];
        for(int i = 0; i < njobs; i++) {
            d[i] = new Data("X");
        }
        // process
        long t1 = System.currentTimeMillis();
        if(par) {
            Arrays.stream(d).parallel().forEach(o -> (new Processor(o)).run());
        } else {
            Arrays.stream(d).sequential().forEach(o -> (new Processor(o)).run());
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d jobs executing in %s threads : %.1f seconds\n", njobs, par ? "many" : "one", (t2 - t1) / 1000.0);
        // check data
        for(int i = 0; i < njobs; i++) {
            if(!d[i].getS().equals("XX")) System.out.println("Ooops");
        }
    }
    public static void main(String[] args) {
        test(false, 256);
        test(true, 256);
    }
}

Note that it is non-trivial to control number of threads.

Example output:

256 jobs executing in one threads : 25.7 seconds
256 jobs executing in many threads : 3.2 seconds

.NET 3.5 (2008) added LINQ and lambdas to .NET.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace Adv
{
    public class Program
    {
        public class Data
        {
            public string S { get; set; }
        }
        public class Processor
        {
            private Data d;
            public Processor(Data d)
            {
                this.d = d;
            }
            public void Run()
            {
                // simulate a lot of work that takes 0.1 second
                string s = d.S;
                Thread.Sleep(100);
                d.S = s + "X";
            }
        }
        public static void Test(bool par, int njobs)
        {
            // setup data
            Data[] d = new Data[njobs];
            for(int i = 0; i < njobs; i++)
            {
                d[i] = new Data { S = "X" };
            }
            // process
            DateTime dt1 = DateTime.Now;
            if(par)
            {
                d.AsParallel().ForAll(o => (new Processor(o)).Run());
            }
            else
            {
                d.ToList().ForEach(o => (new Processor(o)).Run());
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, par ? "many" : "one", (dt2 - dt1).TotalSeconds);
            // check data
            for(int i = 0; i < njobs; i++)
            {
                if(d[i].S != "XX") Console.WriteLine("Ooops");
            }
        }
        public static void Main(string[] args)
        {
            Test(false, 256);
            Test(true,  256);
        }
    }
}

Note that it is non-trivial to control number of threads.

Example output:

256 jobs executing in one threads : 25.6 seconds
256 jobs executing in many threads : 3.2 seconds

.NET 3.5 (2008) added LINQ and lambdas to .NET.

Imports System
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading

Namespace Adv
    Public Class Program
        Public Class Data
            Public Property S() As String
        End Class
        Public Class Processor
            Private d As Data
            Public Sub New(d As Data)
                Me.d = d
            End Sub
            Public Sub Run()
                ' simulate a lot of work that takes 0.1 second
                Dim s As String = d.S
                Thread.Sleep(100)
                d.S = s & "X"
            End Sub
        End Class
        Public Shared Sub Test(par As Boolean, njobs As Integer)
            ' setup data
            Dim d As Data() = New Data(njobs - 1) {}
            For i As Integer = 0 To njobs - 1
                d(i) = New Data() With { .S = "X" }
            Next
            ' process
            Dim dt1 As DateTime = DateTime.Now
            If par Then
                d.AsParallel().ForAll(Sub(o As Data) Call (New Processor(o)).Run())
            Else
                d.ToList().ForEach(Sub(o As Data) Call (New Processor(o)).Run())
            End If
            Dim dt2 As DateTime = DateTime.Now
            Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, If(par, "many", "one"), (dt2 - dt1).TotalSeconds)
            ' check data
            For i As Integer = 0 To njobs - 1
                If d(i).S <> "XX" Then
                    Console.WriteLine("Ooops")
                End If
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Test(False, 256)
            Test(True, 256)
        End Sub
    End Class
End Namespace

Note that it is non-trivial to control number of threads.

Example output:

256 jobs executing in one threads : 25.6 seconds
256 jobs executing in many threads : 3.3 seconds

Synchronization

There are some potential concurrency problems that may require synchronization.

First there is the traditional concurrent access problem:

works fine, but:

does not produce the expected result.

The solution is to force thread #2 to wait working on M until thread #1 is done working on M - to synchronize the threads.

The second problem is much more subtle. It is the change visibility problem.

We think of a multi-threaded application running as:

How we think

But it really runs as:

How it is

To avoid problems it is necessary to synchronize CPU specific caches with memory.

Most programming languages / frameworks does that when:

(most is not all so remember to check the specifics for what you use)

So the classic construct:

does not work unless the stop variable is declared volatile or synchronization is used.

A third problem is that some modern CPU's reorder memory reads and/or writes behind the scene as part of optimization. This can create inconsistencies where one thread first write to M1 and then to M2, but the CPU reorders so that another thread can first read the new value from M2 and then the old value from M1.

The solution is do what is called a memory barrier. The same 3 items that typical handle the visbility problem typical also handle the reordering problem.

Let us first see some code that is not working correctly:

package threads;

public class NotCorrect {
    public static class Data {
        private String s;
        public Data(String s) {
            this.s = s;
        }
        public String getS() {
            return s;
        }
        public void setS(String s) {
            this.s = s;
        }
    }
    public static class Processor extends Thread {
        private Data d;
        public Processor(Data d) {
            this.d = d;
        }
        @Override
        public void run() {
            // simulate some work between get and set
            String s = d.getS();
            Thread.yield();
            d.setS(s + 'X');
        }
    }
    public static void test(int njobs, int nthreads) {
        // setup data
        Data d = new Data("");
        // process with shared data
        for(int i = 0; i < njobs / nthreads; i++) {
            // create threads
            Thread[] t = new Thread[nthreads];
            for(int j = 0; j < nthreads; j++) {
                t[j] = new Processor(d);
            }
            // start threads
            for(int j = 0; j < nthreads; j++) {
                t[j].start();
            }
            // wait for threads to complete
            for(int j = 0; j < nthreads; j++) {
                try {
                    t[j].join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        System.out.printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, d.getS().length());
    }
    public static void main(String[] args) {
        test(2560, 1);
        test(2560, 2);
        test(2560, 4);
        test(2560, 8);
        test(2560, 16);
        test(2560, 32);
        test(2560, 64);
    }
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2558
4 threads : expected = 2560, actual = 2506
8 threads : expected = 2560, actual = 2462
16 threads : expected = 2560, actual = 2450
32 threads : expected = 2560, actual = 2444
64 threads : expected = 2560, actual = 2403
using System;
using System.Threading;

namespace NotCorrect
{
    public class Program
    {
        public class Data
        {
            public string S { get; set; }
        }
        public class Processor
        {
            private Data d;
            public Processor(Data d)
            {
                this.d = d;
            }
            public void Run()
            {
                // simulate some work between get and set
                string s = d.S;
                Thread.Yield();
                d.S = s + "X";
            }
        }
        public static void Test(int njobs, int nthreads)
        {
            // setup data
            Data d = new Data { S = "" };
            // process with shared data
            for(int i = 0; i < njobs / nthreads; i++)
            {
                // create threads
                Thread[] t = new Thread[nthreads];
                for(int j = 0; j < nthreads; j++)
                {
                    t[j] = new Thread((new Processor(d)).Run);
                }
                // start threads
                for(int j = 0; j < nthreads; j++)
                {
                    t[j].Start();
                }
                // wait for threads to complete
                for(int j = 0; j < nthreads; j++)
                {
                    t[j].Join();
                }
            }
            Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length);
        }
        public static void Main(string[] args)
        {
            Test(2560, 1);
            Test(2560, 2);
            Test(2560, 4);
            Test(2560, 8);
            Test(2560, 16);
            Test(2560, 32);
            Test(2560, 64);
            Console.ReadKey();
        }
    }
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2544
8 threads : expected = 2560, actual = 2535
16 threads : expected = 2560, actual = 2525
32 threads : expected = 2560, actual = 2509
64 threads : expected = 2560, actual = 2514
Imports System
Imports System.Threading

Namespace NotCorrect
    Public Class Program
        Public Class Data
            Public Property S() As String
        End Class
        Public Class Processor
            Private d As Data
            Public Sub New(d As Data)
                Me.d = d
            End Sub
            Public Sub Run()
                ' simulate some work between get and set
                Dim s As String = d.S
                Thread.Yield()
                d.S = s & "X"
            End Sub
        End Class
        Public Shared Sub Test(njobs As Integer, nthreads As Integer)
            ' setup data
            Dim d As New Data() With { .S = "" }
            ' process with shared data
            For i As Integer = 0 To njobs \ nthreads - 1
                ' create threads
                Dim t As Thread() = New Thread(nthreads - 1) {}
                For j As Integer = 0 To nthreads - 1
                    t(j) = New Thread(AddressOf (New Processor(d)).Run)
                Next
                ' start threads
                For j As Integer = 0 To nthreads - 1
                    t(j).Start()
                Next
                ' wait for threads to complete
                For j As Integer = 0 To nthreads - 1
                    t(j).Join()
                Next
            Next
            Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length)
        End Sub
        Public Shared Sub Main(args As String())
            Test(2560, 1)
            Test(2560, 2)
            Test(2560, 4)
            Test(2560, 8)
            Test(2560, 16)
            Test(2560, 32)
            Test(2560, 64)
            Console.ReadKey()
        End Sub
    End Class
End Namespace

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2558
8 threads : expected = 2560, actual = 2538
16 threads : expected = 2560, actual = 2534
32 threads : expected = 2560, actual = 2525
64 threads : expected = 2560, actual = 2492
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <windows.h>
#include <process.h>

#define MAX_STR_LEN 8192

struct data {
    char s[MAX_STR_LEN];
};

unsigned int __stdcall run(void *p)
{
    struct data *d;
    char s[MAX_STR_LEN];
    d = p;
    strcpy(s, d->s);
    Sleep(0);
    strcpy(d->s, s);
    strcat(d->s, "X");
    return 0;
}

void test(int njobs, int nthreads)
{
    struct data d;
    HANDLE *t;
    int i, j;
    /* setup data */
    strcpy(d.s, "");
    /* process with shared data */
    for(i = 0; i < njobs / nthreads; i++)
    {
        /* create and start threads */
        t = malloc(nthreads * sizeof(HANDLE));
        for(j = 0; j < nthreads; j++)
        {
            t[j] = (HANDLE)_beginthreadex(NULL, 0, run, &d, 0, NULL);
        }
        /* wait for threads to complete */
        for(j = 0; j < nthreads; j++)
        {
            WaitForSingleObject(t[j], INFINITE);
            CloseHandle(t[j]);
        }
        free(t);
    }
    printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
}

int main()
{
    test(2560, 1);
    test(2560, 2);
    test(2560, 4);
    test(2560, 8);
    test(2560, 16);
    test(2560, 32);
    test(2560, 64);
    return 0;
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2557
8 threads : expected = 2560, actual = 2556
16 threads : expected = 2560, actual = 2550
32 threads : expected = 2560, actual = 2526
64 threads : expected = 2560, actual = 2548
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <unistd.h>
#include <pthread.h>

#define MAX_STR_LEN 8192

struct data {
    char s[MAX_STR_LEN];
};

void *run(void *p)
{
    struct data *d;
    char s[MAX_STR_LEN];
    d = p;
    strcpy(s, d->s);
    usleep(1);
    strcpy(d->s, s);
    strcat(d->s, "X");
    return NULL;
}

void test(int njobs, int nthreads)
{
    struct data d;
    pthread_t *t;
    int i, j;
    void *p;
    /* setup data */
    strcpy(d.s, "");
    /* process with shared data */
    for(i = 0; i < njobs / nthreads; i++)
    {
        /* create and start threads */
        t = malloc(nthreads * sizeof(pthread_t));
        for(j = 0; j < nthreads; j++)
        {
            pthread_create(&t[j], NULL, run, &d);
        }
        /* wait for threads to complete */
        for(j = 0; j < nthreads; j++)
        {
            pthread_join(t[j], &p);
        }
        free(t);
    }
    printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
}

int main()
{
    test(2560, 1);
    test(2560, 2);
    test(2560, 4);
    test(2560, 8);
    test(2560, 16);
    test(2560, 32);
    test(2560, 64);
    return 0;
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 1378
4 threads : expected = 2560, actual = 670
8 threads : expected = 2560, actual = 326
16 threads : expected = 2560, actual = 162
32 threads : expected = 2560, actual = 81
64 threads : expected = 2560, actual = 40
program NotCorrect;

uses
  Classes, SysUtils, Windows;

type
  Data = class(TObject)
    constructor Create(s : string);
    function GetS : string;
    procedure SetS(s : string);
    property S : string read GetS write SetS;
  private
    _s : string;
  end;
  Processor = class(TThread)
    constructor Create(d : Data);
    function GetD : Data;
    property D : Data read GetD;
  protected
    procedure Execute; override;
  private
    _d : Data;
  end;

constructor Data.Create(s : string);

begin
  _s := s;
end;

function Data.GetS : string;

begin
  GetS := _s;
end;

procedure Data.SetS(s : string);

begin
  _s := s;
end;

constructor Processor.Create(d : Data);

begin
  inherited Create(true);
  _d := d;
end;

function Processor.GetD : Data;

begin
  GetD := _d;
end;

procedure Processor.Execute;

var
  s : string;

begin
  (* simulate some work between get and set *)
  s := D.S;
  Yield;
  D.S := s + 'X';
end;

procedure Test(njobs, nthreads : integer);

var
  d : Data;
  t : array of TThread;
  i, j : integer;

begin
  (* setup data*)
  d := Data.Create('');
  (* process with sharted data *)
  for i := 0 to (njobs div nthreads - 1) do begin
    SetLength(t, nthreads);
    (* create threads *)
    for j := 0 to nthreads-1 do begin
      t[j] := Processor.Create(d);
    end;
    (* start threads *)
    for j := 0 to nthreads-1 do begin
      t[j].Start;
    end;
    (* wait for threads to complete *)
    for j := 0 to nthreads-1 do begin
      t[j].WaitFor;
      t[j].Terminate;
      t[j].Free;
    end;
  end;
  writeln(nthreads:1,' threads : expected = ',njobs:1,', actual = ', length(d.S):1);
end;

begin
  Test(2560, 1);
  Test(2560, 2);
  Test(2560, 4);
  Test(2560, 8);
  Test(2560, 16);
  Test(2560, 32);
  Test(2560, 64);
end.

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2559
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2332
import threading
import time

class Data(object):
    def __init__(self, _s):
        self.s = _s

class Processor(threading.Thread):
    def __init__(self, _d):
        super(Processor, self).__init__()
        self.d = _d
    def run(self):
        # simulate some work between get and set
        s = self.d.s
        time.sleep(0.001)
        self.d.s = s + "X"

def test(njobs, nthreads):
    # setup data
    d = Data("")
    # process
    for i in range(njobs / nthreads): # // in Python 3.x
        # create threads
        t = []
        for j in range(nthreads):
            t.append(Processor(d))
        # start threads
        for j in range(nthreads):
            t[j].start()
        # wait for threads to complete
        for j in range(nthreads):
            t[j].join()
    print("%d threads : expected = %d, actual = %d" % (nthreads, njobs, len(d.s)))

test(2560, 1)
test(2560, 2)
test(2560, 4)
test(2560, 8)
test(2560, 16)
test(2560, 32)
test(2560, 64)

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 1280
4 threads : expected = 2560, actual = 640
8 threads : expected = 2560, actual = 320
16 threads : expected = 2560, actual = 160
32 threads : expected = 2560, actual = 80
64 threads : expected = 2560, actual = 40
(CPython - Jython results are more random like)

The above code is only guaranteed to work for 1 thread. It is not guaranteed to work with more than 1 thread. That does not mean that it is guaranteed to fail. It may accidentally work fine some time or even most times. But it can fail.

Also note that it can fail in different ways. It can complete and produce a wrong result. But it can also crash the program.

And experience shows that non-correct multi-threaded code often works fine in test and first start showing errors in production.

And now let us see the same code with proper synchronization in place to guarantee that the code runs correct:

package threads;

public class Correct {
    public static class Data {
        private String s;
        public Data(String s) {
            this.s = s;
        }
        public String getS() {
            return s;
        }
        public void setS(String s) {
            this.s = s;
        }
    }
    public static class Processor extends Thread {
        private Data d;
        public Processor(Data d) {
            this.d = d;
        }
        @Override
        public void run() {
            // simulate some work between get and set
            synchronized(d) {
                String s = d.getS();
                Thread.yield();
                d.setS(s + 'X');
            }
        }
    }
    public static void test(int njobs, int nthreads) {
        // setup data
        Data d = new Data("");
        // process with shared data
        for(int i = 0; i < njobs / nthreads; i++) {
            // create threads
            Thread[] t = new Thread[nthreads];
            for(int j = 0; j < nthreads; j++) {
                t[j] = new Processor(d);
            }
            // start threads
            for(int j = 0; j < nthreads; j++) {
                t[j].start();
            }
            // wait for threads to complete
            for(int j = 0; j < nthreads; j++) {
                try {
                    t[j].join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        System.out.printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, d.getS().length());
    }
    public static void main(String[] args) {
        test(2560, 1);
        test(2560, 2);
        test(2560, 4);
        test(2560, 8);
        test(2560, 16);
        test(2560, 32);
        test(2560, 64);
    }
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2560
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2560
using System;
using System.Threading;

namespace Correct
{
    public class Program
    {
        public class Data
        {
            public string S { get; set; }
        }
        public class Processor
        {
            private Data d;
            public Processor(Data d)
            {
                this.d = d;
            }
            public void Run()
            {
                // simulate some work between get and set
                lock(d)
                {
                    string s = d.S;
                    Thread.Yield();
                    d.S = s + "X";
                }
            }
        }
        public static void Test(int njobs, int nthreads)
        {
            // setup data
            Data d = new Data { S = "" };
            // process with shared data
            for(int i = 0; i < njobs / nthreads; i++)
            {
                // create threads
                Thread[] t = new Thread[nthreads];
                for(int j = 0; j < nthreads; j++)
                {
                    t[j] = new Thread((new Processor(d)).Run);
                }
                // start threads
                for(int j = 0; j < nthreads; j++)
                {
                    t[j].Start();
                }
                // wait for threads to complete
                for(int j = 0; j < nthreads; j++)
                {
                    t[j].Join();
                }
            }
            Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length);
        }
        public static void Main(string[] args)
        {
            Test(2560, 1);
            Test(2560, 2);
            Test(2560, 4);
            Test(2560, 8);
            Test(2560, 16);
            Test(2560, 32);
            Test(2560, 64);
            Console.ReadKey();
        }
    }
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2560
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2560
Imports System
Imports System.Threading

Namespace Correct
    Public Class Program
        Public Class Data
            Public Property S() As String
        End Class
        Public Class Processor
            Private d As Data
            Public Sub New(d As Data)
                Me.d = d
            End Sub
            Public Sub Run()
                ' simulate some work between get and set
                SyncLock d
                    Dim s As String = d.S
                    Thread.Yield()
                    d.S = s & "X"
                End SyncLock
            End Sub
        End Class
        Public Shared Sub Test(njobs As Integer, nthreads As Integer)
            ' setup data
            Dim d As New Data() With { .S = "" }
            ' process with shared data
            For i As Integer = 0 To njobs \ nthreads - 1
                ' create threads
                Dim t As Thread() = New Thread(nthreads - 1) {}
                For j As Integer = 0 To nthreads - 1
                    t(j) = New Thread(AddressOf (New Processor(d)).Run)
                Next
                ' start threads
                For j As Integer = 0 To nthreads - 1
                    t(j).Start()
                Next
                ' wait for threads to complete
                For j As Integer = 0 To nthreads - 1
                    t(j).Join()
                Next
            Next
            Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length)
        End Sub
        Public Shared Sub Main(args As String())
            Test(2560, 1)
            Test(2560, 2)
            Test(2560, 4)
            Test(2560, 8)
            Test(2560, 16)
            Test(2560, 32)
            Test(2560, 64)
            Console.ReadKey()
        End Sub
    End Class
End Namespace

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2560
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2560
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <windows.h>
#include <process.h>

#define MAX_STR_LEN 8192

struct data {
    char s[MAX_STR_LEN];
};

static CRITICAL_SECTION cs;

unsigned int __stdcall run(void *p)
{
    struct data *d;
    char s[MAX_STR_LEN];
    d = p;
    EnterCriticalSection(&cs);
    strcpy(s, d->s);
    Sleep(0);
    strcpy(d->s, s);
    strcat(d->s, "X");
    LeaveCriticalSection(&cs);
    return 0;
}

void test(int njobs, int nthreads)
{
    struct data d;
    HANDLE *t;
    int i, j;
    /* setup data */
    strcpy(d.s, "");
    /* process with shared data */
    InitializeCriticalSection(&cs);
    for(i = 0; i < njobs / nthreads; i++)
    {
        /* create and start threads */
        t = malloc(nthreads * sizeof(HANDLE));
        for(j = 0; j < nthreads; j++)
        {
            t[j] = (HANDLE)_beginthreadex(NULL, 0, run, &d, 0, NULL);
        }
        /* wait for threads to complete */
        for(j = 0; j < nthreads; j++)
        {
            WaitForSingleObject(t[j], INFINITE);
            CloseHandle(t[j]);
        }
        free(t);
    }
    DeleteCriticalSection(&cs);
    printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
}

int main()
{
    test(2560, 1);
    test(2560, 2);
    test(2560, 4);
    test(2560, 8);
    test(2560, 16);
    test(2560, 32);
    test(2560, 64);
    return 0;
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2560
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2560
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <unistd.h>
#include <pthread.h>

#define MAX_STR_LEN 8192

struct data {
    char s[MAX_STR_LEN];
};

static pthread_mutex_t mtx;

void *run(void *p)
{
    struct data *d;
    char s[MAX_STR_LEN];
    d = p;
    pthread_mutex_lock(&mtx);
    strcpy(s, d->s);
    usleep(1);
    strcpy(d->s, s);
    strcat(d->s, "X");
    pthread_mutex_unlock(&mtx);
    return NULL;
}

void test(int njobs, int nthreads)
{
    struct data d;
    pthread_t *t;
    int i, j;
    void *p;
    pthread_mutex_init(&mtx, NULL);
    /* setup data */
    strcpy(d.s, "");
    /* process with shared data */
    for(i = 0; i < njobs / nthreads; i++)
    {
        /* create and start threads */
        t = malloc(nthreads * sizeof(pthread_t));
        for(j = 0; j < nthreads; j++)
        {
            pthread_create(&t[j], NULL, run, &d);
        }
        /* wait for threads to complete */
        for(j = 0; j < nthreads; j++)
        {
            pthread_join(t[j], &p);
        }
        free(t);
    }
    printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
    pthread_mutex_destroy(&mtx);
}

int main()
{
    test(2560, 1);
    test(2560, 2);
    test(2560, 4);
    test(2560, 8);
    test(2560, 16);
    test(2560, 32);
    test(2560, 64);
    return 0;
}

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2560
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2560
program Correct;

uses
  Classes, SysUtils, Windows;

var
  cs : TRTLCriticalSection;

type
  Data = class(TObject)
    constructor Create(s : string);
    function GetS : string;
    procedure SetS(s : string);
    property S : string read GetS write SetS;
  private
    _s : string;
  end;
  Processor = class(TThread)
    constructor Create(d : Data);
    function GetD : Data;
    property D : Data read GetD;
  protected
    procedure Execute; override;
  private
    _d : Data;
  end;

constructor Data.Create(s : string);

begin
  _s := s;
end;

function Data.GetS : string;

begin
  GetS := _s;
end;

procedure Data.SetS(s : string);

begin
  _s := s;
end;

constructor Processor.Create(d : Data);

begin
  inherited Create(true);
  _d := d;
end;

function Processor.GetD : Data;

begin
  GetD := _d;
end;

procedure Processor.Execute;

var
  s : string;

begin
  (* simulate some work between get and set *)
  EnterCriticalSection(cs);
  s := D.S;
  Yield;
  D.S := s + 'X';
  LeaveCriticalSection(cs);
end;

procedure Test(njobs, nthreads : integer);

var
  d : Data;
  t : array of TThread;
  i, j : integer;

begin
  InitCriticalSection(cs);
  (* setup data*)
  d := Data.Create('');
  (* process with sharted data *)
  for i := 0 to (njobs div nthreads - 1) do begin
    SetLength(t, nthreads);
    (* create threads *)
    for j := 0 to nthreads-1 do begin
      t[j] := Processor.Create(d);
    end;
    (* start threads *)
    for j := 0 to nthreads-1 do begin
      t[j].Start;
    end;
    (* wait for threads to complete *)
    for j := 0 to nthreads-1 do begin
      t[j].WaitFor;
      t[j].Terminate;
      t[j].Free;
    end;
  end;
  writeln(nthreads:1,' threads : expected = ',njobs:1,', actual = ', length(d.S):1);
  DeleteCriticalSection(cs);
end;

begin
  Test(2560, 1);
  Test(2560, 2);
  Test(2560, 4);
  Test(2560, 8);
  Test(2560, 16);
  Test(2560, 32);
  Test(2560, 64);
end.

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2560
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2560
import threading
import time

class Data(object):
    def __init__(self, _s):
        self.s = _s

lck = threading.RLock()

class Processor(threading.Thread):
    def __init__(self, _d):
        super(Processor, self).__init__()
        self.d = _d
    def run(self):
        # simulate some work between get and set
        lck.acquire(1)
        s = self.d.s
        time.sleep(0.001)
        self.d.s = s + "X"
        lck.release()

def test(njobs, nthreads):
    # setup data
    d = Data("")
    # process
    for i in range(njobs / nthreads): # // in Python 3.x
        # create threads
        t = []
        for j in range(nthreads):
            t.append(Processor(d))
        # start threads
        for j in range(nthreads):
            t[j].start()
        # wait for threads to complete
        for j in range(nthreads):
            t[j].join()
    print("%d threads : expected = %d, actual = %d" % (nthreads, njobs, len(d.s)))

test(2560, 1)
test(2560, 2)
test(2560, 4)
test(2560, 8)
test(2560, 16)
test(2560, 32)
test(2560, 64)

Example output:

1 threads : expected = 2560, actual = 2560
2 threads : expected = 2560, actual = 2560
4 threads : expected = 2560, actual = 2560
8 threads : expected = 2560, actual = 2560
16 threads : expected = 2560, actual = 2560
32 threads : expected = 2560, actual = 2560
64 threads : expected = 2560, actual = 2560

Desktop GUI

It is common for desktop GUI frameworks to have all GUI interactions running in an event thread.

This means that any atempts to do something long running directly as a GUI action will hang the GUI. Instead a new thread should be started to do the long running task - that will keep the GUI responsive.

Server

For standalone server applications it is up to the developer to manage threads, but for applications running within a server framework, then it is common for the framework to handle threads.

Java web container (Tomcat, Jetty etc.):

Full Java EE application server (JBoss, WebSphere, WebLogic etc.):

ASP.NET:

Apache 2.x, mod_php and PHP:

Article history:

Version Date Description
1.0 August 31st 2018 Initial version based on old article (in Danish) on Eksperten.dk
1.1 December 23rd 2018 Add Python

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj