Threads and multi-threaded applications are important concepts in modern software that any software developer needs to understand.
A thread is a basic execution unit in most modern operating systems.
A process is the unit running the application.
A process has one or more associated threads.
Threads share heap but have individual stacks.
An application with only one thread is called a single-threaded application. An application with more than one thread is called a multi-threaded application.
A multi-threaded application is able to do multiple things in parallel. Which can be very convenient both to make the code design cleaner and to reduce running time.
Note that it is possible and in fact very common to have more threads than can actually be executed in parallel. Let us say that we have a server with 2 CPU's that are 8 core with hyperthreading aka 2s16c32t - the server can execute 32 threads in parallel, but it is not a problem for a server application to start 500 or 1000 threads. It may often be benficial as it is common to have the majority of threads waiting for something and not be ready to execute.
25 years ago it would have been relevant to distinguish between kernel mode threads and user mode (aka green) threads. Kernel mode threads are threads actually scheduled by the operating systems. User mode (aka green) threads are an abstraction handled by some runtime sitting between the application and the operating system. Today practically all operating systems supports kernel mode threads and user mode (aka green) threads are no longer interesting. There are some problems with user mode (aka green) threads as some actions that with kernel mode threads would only block one thread can result in all threads being blocked.
The basic thread operations are:
And a multi-threaded application can be written using just those basic constructs.
Thread support is builtin to Java.
Java has two ways to define what will run in a thread.
One way is to create a class that extends Thread and overide the run method.
package threads;
public class Basic1 {
public static class Data {
private String s;
public Data(String s) {
this.s = s;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
}
public static class Processor extends Thread {
private Data d;
public Processor(Data d) {
this.d = d;
}
@Override
public void run() {
try {
// simulate a lot of work that takes 0.1 second
String s = d.getS();
Thread.sleep(100);
d.setS(s + 'X');
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void test(int njobs, int nthreads) {
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++) {
d[i] = new Data("X");
}
// process
long t1 = System.currentTimeMillis();
int k = 0;
for(int i = 0; i < njobs / nthreads; i++) {
// create threads
Thread[] t = new Thread[nthreads];
for(int j = 0; j < nthreads; j++) {
t[j] = new Processor(d[k]);
k++;
}
// start threads
for(int j = 0; j < nthreads; j++) {
t[j].start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++) {
try {
t[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
long t2 = System.currentTimeMillis();
System.out.printf("%d jobs executing in %d threads : %.1f seconds\n", njobs, nthreads, (t2 - t1) / 1000.0);
// check data
for(int i = 0; i < njobs; i++) {
if(!d[i].getS().equals("XX")) System.out.println("Ooops");
}
}
public static void main(String[] args) {
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
}
}
Example output:
256 jobs executing in 1 threads : 25.7 seconds 256 jobs executing in 2 threads : 12.8 seconds 256 jobs executing in 4 threads : 6.4 seconds 256 jobs executing in 8 threads : 3.2 seconds 256 jobs executing in 16 threads : 1.6 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.5 seconds
Thread support is builtin to Java.
Java has two ways to define what will run in a thread.
Another way is to create a class that implements Runnable and supply a run method.
package threads;
public class Basic2 {
public static class Data {
private String s;
public Data(String s) {
this.s = s;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
}
public static class Processor implements Runnable {
private Data d;
public Processor(Data d) {
this.d = d;
}
@Override
public void run() {
try {
// simulate a lot of work that takes 0.1 second
String s = d.getS();
Thread.sleep(100);
d.setS(s + 'X');
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void test(int njobs, int nthreads) {
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++) {
d[i] = new Data("X");
}
// process
long t1 = System.currentTimeMillis();
int k = 0;
for(int i = 0; i < njobs / nthreads; i++) {
// create threads
Thread[] t = new Thread[nthreads];
for(int j = 0; j < nthreads; j++) {
t[j] = new Thread(new Processor(d[k]));
k++;
}
// start threads
for(int j = 0; j < nthreads; j++) {
t[j].start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++) {
try {
t[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
long t2 = System.currentTimeMillis();
System.out.printf("%d jobs executing in %d threads : %.1f seconds\n", njobs, nthreads, (t2 - t1) / 1000.0);
// check data
for(int i = 0; i < njobs; i++) {
if(!d[i].getS().equals("XX")) System.out.println("Ooops");
}
}
public static void main(String[] args) {
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
}
}
Example output:
256 jobs executing in 1 threads : 25.7 seconds 256 jobs executing in 2 threads : 12.8 seconds 256 jobs executing in 4 threads : 6.5 seconds 256 jobs executing in 8 threads : 3.3 seconds 256 jobs executing in 16 threads : 1.6 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.4 seconds
.NET comes with a Thread class.
using System;
using System.Threading;
namespace Basic
{
public class Program
{
public class Data
{
public string S { get; set; }
}
public class Processor
{
private Data d;
public Processor(Data d)
{
this.d = d;
}
public void Run()
{
// simulate a lot of work that takes 0.1 second
string s = d.S;
Thread.Sleep(100);
d.S = s + "X";
}
}
public static void Test(int njobs, int nthreads)
{
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data { S = "X" };
}
// process
DateTime dt1 = DateTime.Now;
int k = 0;
for(int i = 0; i < njobs / nthreads; i++)
{
// create threads
Thread[] t = new Thread[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new Thread((new Processor(d[k])).Run);
k++;
}
// start threads
for(int j = 0; j < nthreads; j++)
{
t[j].Start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j].Join();
}
}
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds);
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i].S != "XX") Console.WriteLine("Ooops");
}
}
public static void Main(string[] args)
{
Test(256, 1);
Test(256, 2);
Test(256, 4);
Test(256, 8);
Test(256, 16);
Test(256, 32);
Test(256, 64);
}
}
}
Example output:
256 jobs executing in 1 threads : 25.8 seconds 256 jobs executing in 2 threads : 14.3 seconds 256 jobs executing in 4 threads : 7.8 seconds 256 jobs executing in 8 threads : 4.3 seconds 256 jobs executing in 16 threads : 2.2 seconds 256 jobs executing in 32 threads : 1.1 seconds 256 jobs executing in 64 threads : 0.8 seconds
.NET comes with a Thread class.
Imports System
Imports System.Threading
Namespace Basic
Public Class Program
Public Class Data
Public Property S() As String
End Class
Public Class Processor
Private d As Data
Public Sub New(d As Data)
Me.d = d
End Sub
Public Sub Run()
' simulate a lot of work that takes 0.1 second
Dim s As String = d.S
Thread.Sleep(100)
d.S = s & "X"
End Sub
End Class
Public Shared Sub Test(njobs As Integer, nthreads As Integer)
' setup data
Dim d As Data() = New Data(njobs - 1) {}
For i As Integer = 0 To njobs - 1
d(i) = New Data() With { .S = "X" }
Next
' process
Dim dt1 As DateTime = DateTime.Now
Dim k As Integer = 0
For i As Integer = 0 To njobs \ nthreads - 1
' create threads
Dim t As Thread() = New Thread(nthreads - 1) {}
For j As Integer = 0 To nthreads - 1
t(j) = New Thread(AddressOf (New Processor(d(k))).Run)
k += 1
Next
' start threads
For j As Integer = 0 To nthreads - 1
t(j).Start()
Next
' wait for threads to complete
For j As Integer = 0 To nthreads - 1
t(j).Join()
Next
Next
Dim dt2 As DateTime = DateTime.Now
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds)
' check data
For i As Integer = 0 To njobs - 1
If d(i).S <> "XX" Then
Console.WriteLine("Ooops")
End If
Next
End Sub
Public Shared Sub Main(args As String())
Test(256, 1)
Test(256, 2)
Test(256, 4)
Test(256, 8)
Test(256, 16)
Test(256, 32)
Test(256, 64)
End Sub
End Class
End Namespace
Example output:
256 jobs executing in 1 threads : 25.9 seconds 256 jobs executing in 2 threads : 13.9 seconds 256 jobs executing in 4 threads : 7.6 seconds 256 jobs executing in 8 threads : 4.0 seconds 256 jobs executing in 16 threads : 2.0 seconds 256 jobs executing in 32 threads : 1.0 seconds 256 jobs executing in 64 threads : 1.0 seconds
Windows comes with thread support in Win32 API.
There are two different ways to start a thread.
One way is the Win32 API call CreateThread.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <windows.h>
#define MAX_STR_LEN 1024
struct data {
char s[MAX_STR_LEN];
};
DWORD WINAPI run(void *p)
{
struct data *d;
char s[MAX_STR_LEN];
d = p;
/* simulate a lot of work that takes 0.1 second */
strcpy(s, d->s);
Sleep(100);
strcpy(d->s, s);
strcat(d->s, "X");
return 0;
}
void test(int njobs, int nthreads)
{
struct data *d;
HANDLE *t;
time_t t1, t2;
int i, j, k;
/* setup data */
d = malloc(njobs * sizeof(struct data));
for(i = 0; i < njobs; i++)
{
strcpy(d[i].s, "X");
}
/* process */
t1 = time(NULL);
k = 0;
for(i = 0; i < njobs / nthreads; i++)
{
/* create and start threads */
t = malloc(nthreads * sizeof(HANDLE));
for(j = 0; j < nthreads; j++)
{
t[j] = CreateThread(NULL, 0, run, &d[k], 0, NULL);
k++;
}
/* wait for threads to complete */
for(j = 0; j < nthreads; j++)
{
WaitForSingleObject(t[j], INFINITE);
CloseHandle(t[j]);
}
free(t);
}
t2 = time(NULL);
printf("%d jobs executing in %d threads : %d seconds\n", njobs, nthreads, (int)(t2 - t1));
/* check data */
for(i = 0; i < njobs; i++)
{
if(strcmp(d[i].s, "XX") != 0) printf("Ooops\n");
}
free(d);
}
int main()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
return 0;
}
Example output:
256 jobs executing in 1 threads : 26 seconds 256 jobs executing in 2 threads : 12 seconds 256 jobs executing in 4 threads : 7 seconds 256 jobs executing in 8 threads : 3 seconds 256 jobs executing in 16 threads : 2 seconds 256 jobs executing in 32 threads : 0 seconds 256 jobs executing in 64 threads : 1 seconds
Windows comes with thread support in Win32 API.
There are two different ways to start a thread.
Another way is to use one of the MSVCRTL function _beginthread or _beginthreadex. _beginthread is not good as it does not provide a convenient way to manage the created thread. _beginthreadex does that and is actually preferred over CreateThread (for C/C++ applications), because it prepare the MSVCRTL for multi-threaded usage.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <windows.h>
#include <process.h>
#define MAX_STR_LEN 1024
struct data {
char s[MAX_STR_LEN];
};
unsigned int __stdcall run(void *p)
{
struct data *d;
char s[MAX_STR_LEN];
d = p;
/* simulate a lot of work that takes 0.1 second */
strcpy(s, d->s);
Sleep(100);
strcpy(d->s, s);
strcat(d->s, "X");
return 0;
}
void test(int njobs, int nthreads)
{
struct data *d;
HANDLE *t;
time_t t1, t2;
int i, j, k;
/* setup data */
d = malloc(njobs * sizeof(struct data));
for(i = 0; i < njobs; i++)
{
strcpy(d[i].s, "X");
}
/* process */
t1 = time(NULL);
k = 0;
for(i = 0; i < njobs / nthreads; i++)
{
/* create and start threads */
t = malloc(nthreads * sizeof(HANDLE));
for(j = 0; j < nthreads; j++)
{
t[j] = (HANDLE)_beginthreadex(NULL, 0, run, &d[k], 0, NULL);
k++;
}
/* wait for threads to complete */
for(j = 0; j < nthreads; j++)
{
WaitForSingleObject(t[j], INFINITE);
CloseHandle(t[j]);
}
free(t);
}
t2 = time(NULL);
printf("%d jobs executing in %d threads : %d seconds\n", njobs, nthreads, (int)(t2 - t1));
/* check data */
for(i = 0; i < njobs; i++)
{
if(strcmp(d[i].s, "XX") != 0) printf("Ooops\n");
}
free(d);
}
int main()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
return 0;
}
Example output:
256 jobs executing in 1 threads : 26 seconds 256 jobs executing in 2 threads : 13 seconds 256 jobs executing in 4 threads : 6 seconds 256 jobs executing in 8 threads : 3 seconds 256 jobs executing in 16 threads : 2 seconds 256 jobs executing in 32 threads : 1 seconds 256 jobs executing in 64 threads : 0 seconds
POSIX threads are available on all POSIX compliant operating system. That means Unix (including MacOS X), Linux, IBM mainframe, OpenVMS etc.. In practice almost everything except Windows.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#define MAX_STR_LEN 1024
struct data {
char s[MAX_STR_LEN];
};
void *run(void *p)
{
struct data *d;
char s[MAX_STR_LEN];
d = p;
/* simulate a lot of work that takes 0.1 second */
strcpy(s, d->s);
usleep(100000);
strcpy(d->s, s);
strcat(d->s, "X");
return NULL;
}
void test(int njobs, int nthreads)
{
struct data *d;
pthread_t *t;
time_t t1, t2;
int i, j, k;
void *p;
/* setup data */
d = malloc(njobs * sizeof(struct data));
for(i = 0; i < njobs; i++)
{
strcpy(d[i].s, "X");
}
/* process */
t1 = time(NULL);
k = 0;
for(i = 0; i < njobs / nthreads; i++)
{
/* create and start threads */
t = malloc(nthreads * sizeof(pthread_t));
for(j = 0; j < nthreads; j++)
{
pthread_create(&t[j], NULL, run, &d[k]);
k++;
}
/* wait for threads to complete */
for(j = 0; j < nthreads; j++)
{
pthread_join(t[j], &p);
}
free(t);
}
t2 = time(NULL);
printf("%d jobs executing in %d threads : %d seconds\n", njobs, nthreads, (int)(t2 - t1));
/* check data */
for(i = 0; i < njobs; i++)
{
if(strcmp(d[i].s, "XX") != 0) printf("Ooops\n");
}
free(d);
}
int main()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
return 0;
}
Example output:
256 jobs executing in 1 threads : 26 seconds 256 jobs executing in 2 threads : 12 seconds 256 jobs executing in 4 threads : 7 seconds 256 jobs executing in 8 threads : 3 seconds 256 jobs executing in 16 threads : 2 seconds 256 jobs executing in 32 threads : 0 seconds 256 jobs executing in 64 threads : 1 seconds
Boost is a very widely used C++ library. Boost comes with a thread library.
#include <iostream>
#include <iomanip>
#include <string>
//using namespace std; // std may conflict with boost for newer C++
using std::string;
using std::cout;
using std::endl;
using std::fixed;
using std::setprecision;
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
using namespace boost;
using namespace boost::chrono;
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
void run(Data *d)
{
// simulate a lot of work that takes 0.1 second
string s = d->GetS();
this_thread::sleep_for(milliseconds(100));
d->setS(s + "X");
}
typedef thread *thread_ptr;
void test(int njobs, int nthreads)
{
// setup data
DataPtr *d = new DataPtr[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data("X");
}
// process
time_point<system_clock> t1 = system_clock::now();
int k = 0;
for(int i = 0; i < njobs / nthreads; i++)
{
// create and start threads
thread_ptr *t = new thread_ptr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new thread(run, d[k]);
k++;
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->join();
delete t[j];
}
delete[] t;
}
time_point<system_clock> t2 = system_clock::now();
duration<double> dt = t2 - t1;
cout << njobs << " jobs executing in " << nthreads << " threads : " << fixed << setprecision(1) << dt.count() << " seconds" << endl;
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i]->GetS() != "XX") cout << "Ooops" << endl;
delete d[i];
}
delete[] d;
}
int main()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
return 0;
}
(the C++ style is probably a bit oldfashioned, but the purpose is to show thread functionality not modern C++ style)
Example output:
256 jobs executing in 1 threads : 31.2 seconds 256 jobs executing in 2 threads : 15.6 seconds 256 jobs executing in 4 threads : 7.8 seconds 256 jobs executing in 8 threads : 3.9 seconds 256 jobs executing in 16 threads : 2.0 seconds 256 jobs executing in 32 threads : 1.0 seconds 256 jobs executing in 64 threads : 0.5 seconds
C++ added a thread library as part of the standard. It is really just a copy of Boost thread library.
#include <iostream>
#include <iomanip>
#include <string>
#include <thread>
#include <chrono>
using namespace std;
using namespace std::chrono;
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
void run(Data *d)
{
// simulate a lot of work that takes 0.1 second
string s = d->GetS();
this_thread::sleep_for(milliseconds(100));
d->setS(s + "X");
}
typedef thread *thread_ptr;
void test(int njobs, int nthreads)
{
// setup data
DataPtr *d = new DataPtr[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data("X");
}
// process
time_point<system_clock> t1 = system_clock::now();
int k = 0;
for(int i = 0; i < njobs / nthreads; i++)
{
// create and start threads
thread_ptr *t = new thread_ptr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new thread(run, d[k]);
k++;
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->join();
delete t[j];
}
delete[] t;
}
time_point<system_clock> t2 = system_clock::now();
duration<double> dt = t2 - t1;
cout << njobs << " jobs executing in " << nthreads << " threads : " << fixed << setprecision(1) << dt.count() << " seconds" << endl;;
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i]->GetS() != "XX") cout << "Ooops" << endl;
delete d[i];
}
delete[] d;
}
int main()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
return 0;
}
(the C++ style is probably a bit oldfashioned, but the purpose is to show thread functionality not modern C++ style)
Example output:
256 jobs executing in 1 threads : 28.0 seconds 256 jobs executing in 2 threads : 14.0 seconds 256 jobs executing in 4 threads : 7.0 seconds 256 jobs executing in 8 threads : 3.5 seconds 256 jobs executing in 16 threads : 1.7 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.4 seconds
Qt has a thread library that is very much inspired by Java.
#include <iostream>
#include <iomanip>
#include <string>
using namespace std;
#include <QtCore/QThread>
#include <QtCore/QTime>
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
class Processor : public QThread
{
private:
Data *d;
protected:
virtual void run();
public:
Processor(Data *d);
};
Processor::Processor(Data *d)
{
this->d = d;
}
void Processor::run()
{
// simulate a lot of work that takes 0.1 second
string s = d->GetS();
QThread::msleep(100);
d->SetS(s + "X");
}
typedef QThread *QThreadPtr;
void test(int njobs, int nthreads)
{
// setup data
DataPtr *d = new DataPtr[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data("X");
}
// process
QTime tim;
tim.start();;
int k = 0;
for(int i = 0; i < njobs / nthreads; i++)
{
// create threads
QThreadPtr *t = new QThreadPtr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new Processor(d[k]);
k++;
}
// start threads
for(int j = 0; j < nthreads; j++)
{
t[j]->start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->wait();
delete t[j];
}
delete[] t;
}
int dt = tim.elapsed();
cout << njobs << " jobs executing in " << nthreads << " threads : " << fixed << setprecision(1) << (dt / 1000.0) << " seconds" << endl;;
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i]->GetS() != "XX") cout << "Ooops" << endl;
delete d[i];
}
delete[] d;
}
void realmain()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
}
Example output:
256 jobs executing in 1 threads : 28.0 seconds 256 jobs executing in 2 threads : 14.0 seconds 256 jobs executing in 4 threads : 7.0 seconds 256 jobs executing in 8 threads : 3.5 seconds 256 jobs executing in 16 threads : 1.7 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.4 seconds
Delphi/Lazarus provide a wrapper API around the underlying operating systems thread API (Win32 API on Windows and Pthreads on *nix).
program Basic;
uses
Classes, SysUtils, Windows;
type
Data = class(TObject)
constructor Create(s : string);
function GetS : string;
procedure SetS(s : string);
property S : string read GetS write SetS;
private
_s : string;
end;
Processor = class(TThread)
constructor Create(d : Data);
function GetD : Data;
property D : Data read GetD;
protected
procedure Execute; override;
private
_d : Data;
end;
constructor Data.Create(s : string);
begin
_s := s;
end;
function Data.GetS : string;
begin
GetS := _s;
end;
procedure Data.SetS(s : string);
begin
_s := s;
end;
constructor Processor.Create(d : Data);
begin
inherited Create(true);
_d := d;
end;
function Processor.GetD : Data;
begin
GetD := _d;
end;
procedure Processor.Execute;
var
s : string;
begin
(* simulate a lot of work that takes 0.1 second *)
s := D.S;
Sleep(100);
D.S := s + 'X';
end;
procedure Test(njobs, nthreads : integer);
var
d : array of Data;
t : array of TThread;
t1, t2 : integer;
dt : double;
i, j, k : integer;
begin
(* setup data*)
SetLength(d, njobs);
for i := 0 to njobs-1 do begin
d[i] := Data.Create('X');
end;
(* process *)
t1 := GetTickCount;
k := 0;
for i := 0 to (njobs div nthreads - 1) do begin
SetLength(t, nthreads);
(* create threads *)
for j := 0 to nthreads-1 do begin
t[j] := Processor.Create(d[k]);
k := k + 1;
end;
(* start threads *)
for j := 0 to nthreads-1 do begin
t[j].Start;
end;
(* wait for threads to complete *)
for j := 0 to nthreads-1 do begin
t[j].WaitFor;
t[j].Terminate;
t[j].Free;
end;
end;
t2 := GetTickCount;
dt := (t2 - t1) / 1000;
writeln(njobs:1,' jobs executing in ',nthreads:1,' threads : ',dt:1:1,' seconds');
(* check data *)
for i := 0 to njobs-1 do begin
if d[i].S <> 'XX' then writeln('Ooops');
d[i].Free;
end;
end;
begin
Test(256, 1);
Test(256, 2);
Test(256, 4);
Test(256, 8);
Test(256, 16);
Test(256, 32);
Test(256, 64);
end.
Example output:
256 jobs executing in 1 threads : 25.6 seconds 256 jobs executing in 2 threads : 12.8 seconds 256 jobs executing in 4 threads : 6.4 seconds 256 jobs executing in 8 threads : 3.2 seconds 256 jobs executing in 16 threads : 1.6 seconds 256 jobs executing in 32 threads : 0.8 seconds 256 jobs executing in 64 threads : 0.4 seconds
Python comes with a threading module.
The standard Python implementation CPython has a GIL (Global Interpreter Lock) that means that it can only interpret Python code in one thread at a time. Note that non-Python-interpreting activities like IO can still be done in parallel. Some other Python implementations like Jython does not have a GIL.
import threading
import time
class Data(object):
def __init__(self, _s):
self.s = _s
class Processor(threading.Thread):
def __init__(self, _d):
super(Processor, self).__init__()
self.d = _d
def run(self):
# simulate a lot of work that takes 0.1 second
s = self.d.s
time.sleep(0.1)
self.d.s = s + "X"
def test(njobs, nthreads):
# setup data
d = []
for i in range(njobs):
d.append(Data("X"))
# process
t1 = time.time()
k = 0
for i in range(njobs / nthreads): # // in Python 3.x
# create threads
t = []
for j in range(nthreads):
t.append(Processor(d[k]))
k = k + 1
# start threads
for j in range(nthreads):
t[j].start()
# wait for threads to complete
for j in range(nthreads):
t[j].join()
t2 = time.time()
print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
# check data
for i in range(njobs):
if d[i].s != "XX":
print("Ooops")
test(256, 1)
test(256, 2)
test(256, 4)
test(256, 8)
test(256, 16)
test(256, 32)
test(256, 64)
Example output:
256 jobs executing in 1 threads : 28.0 seconds 256 jobs executing in 2 threads : 14.0 seconds 256 jobs executing in 4 threads : 7.0 seconds 256 jobs executing in 8 threads : 3.5 seconds 256 jobs executing in 16 threads : 1.7 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.4 seconds
Python comes with a multi-processing module.
Note that the multi-processing module does not use threads. Instead it spawns/forks copies of the Python program in separate processes. This works around the GIL limitation. But obviously the overhead of process creation is larger than thread creation (specifics are OS dependent).
import multiprocessing
import time
class Data(object):
def __init__(self, _s):
self.s = _s
# non-OO as that is much simpler
def run(d, q, k):
# simulate a lot of work that takes 0.1 second
s = d.s
time.sleep(0.1)
d.s = s + "X"
# send updated data back
q.put([k, d])
def test(njobs, nthreads):
# setup data
d = []
for i in range(njobs):
d.append(Data("X"))
# process
t1 = time.time()
q = multiprocessing.Queue(njobs)
k = 0
for i in range(njobs / nthreads): # // in Python 3.x
# create threads
t = []
for j in range(nthreads):
t.append(multiprocessing.Process(target=run, args=(d[k],q,k,)))
k = k + 1
# start threads
for j in range(nthreads):
t[j].start()
# wait for threads to complete
for j in range(nthreads):
t[j].join()
t2 = time.time()
print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
# get all updated data
while not q.empty():
res = q.get()
d[res[0]] = res[1]
# check data
for i in range(njobs):
if d[i].s != "XX":
print("Ooops" + d[i].s)
if __name__ == '__main__': # necesaary as copies are forked
test(128, 1)
test(128, 2)
test(128, 4)
test(128, 8)
test(128, 16)
test(128, 32)
test(128, 64)
Example output:
128 jobs executing in 1 threads : 26.4 seconds 128 jobs executing in 2 threads : 14.0 seconds 128 jobs executing in 4 threads : 7.6 seconds 128 jobs executing in 8 threads : 5.4 seconds 128 jobs executing in 16 threads : 4.8 seconds 128 jobs executing in 32 threads : 4.6 seconds 128 jobs executing in 64 threads : 4.5 seconds
Note that the results are not totally deterministic - the exact times will vary between runs.
Also note that the reason why it scales so well is that the task is not CPU intensive. If the task was CPU intensive then it would only scale to about 4 threads because the system used is a single CPU with 4 cores system.
Some frameworks has introduced thread pools to make it easier to create multi-threaded applications.
Instead of the application explicit managing threads then the application just submits jobs to the thread pool and the thread pool run jobs on a pool of threads.
Java 1.5 (2004) added a bunch of multi threading support classes including a thread pool service to Java.
package threads;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Pool {
public static class Data {
private String s;
public Data(String s) {
this.s = s;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
}
public static class Processor implements Callable<Void> {
private Data d;
public Processor(Data d) {
this.d = d;
}
@Override
public Void call() {
try {
// simulate a lot of work that takes 0.1 second
String s = d.getS();
Thread.sleep(100);
d.setS(s + 'X');
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
public static void test(int njobs, int nthreads) {
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++) {
d[i] = new Data("X");
}
long t1 = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(nthreads);
// submit jobs
@SuppressWarnings("unchecked")
Future<Void>[] res = new Future[njobs];
for(int i = 0; i < njobs; i++) {
res[i] = es.submit(new Processor(d[i]));
}
// wait for jobs to complete
for(int i = 0; i < njobs; i++) {
try {
res[i].get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
es.shutdown();
long t2 = System.currentTimeMillis();
System.out.printf("%d jobs executing in %d threads : %.1f seconds\n", njobs, nthreads, (t2 - t1) / 1000.0);
// check data
for(int i = 0; i < njobs; i++) {
if(!d[i].getS().equals("XX")) System.out.println("Ooops");
}
}
public static void main(String[] args) {
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
}
}
Example output:
256 jobs executing in 1 threads : 25.6 seconds 256 jobs executing in 2 threads : 12.8 seconds 256 jobs executing in 4 threads : 6.4 seconds 256 jobs executing in 8 threads : 3.2 seconds 256 jobs executing in 16 threads : 1.6 seconds 256 jobs executing in 32 threads : 0.8 seconds 256 jobs executing in 64 threads : 0.4 seconds
.NET always had a thread pool class.
using System;
using System.Threading;
namespace PoolOld
{
public class Program
{
public class Counter
{
public int N { get; set; }
}
public class Data
{
public string S { get; set; }
}
public class Processor
{
private Data d;
public Processor(Data d)
{
this.d = d;
}
public void Run(Object c)
{
// simulate a lot of work that takes 0.1 second
string s = d.S;
Thread.Sleep(100);
d.S = s + "X";
// count down
lock(c)
{
((Counter)c).N--;
}
}
}
public static void Test(int njobs, int nthreads)
{
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data { S = "X" };
}
// process
DateTime dt1 = DateTime.Now;
ThreadPool.SetMinThreads(nthreads, nthreads);
ThreadPool.SetMaxThreads(nthreads, nthreads);
// submit jobs
Counter c = new Counter { N = njobs };
for(int i = 0; i < njobs; i++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback((new Processor(d[i])).Run), c);
}
// wait for jobs to complete
while(true)
{
lock(c)
{
if(c.N <= 0) break;
}
Thread.Sleep(25);
}
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds);
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i].S != "XX") Console.WriteLine("Ooops");
}
}
public static void Main(string[] args)
{
Test(256, 1);
Test(256, 2);
Test(256, 4);
Test(256, 8);
Test(256, 16);
Test(256, 32);
Test(256, 64);
}
}
}
Example output:
256 jobs executing in 1 threads : 25.6 seconds 256 jobs executing in 2 threads : 15.3 seconds 256 jobs executing in 4 threads : 7.6 seconds 256 jobs executing in 8 threads : 3.9 seconds 256 jobs executing in 16 threads : 2.8 seconds 256 jobs executing in 32 threads : 2.2 seconds 256 jobs executing in 64 threads : 2.0 seconds
.NET always had a thread pool class.
Imports System
Imports System.Threading
Namespace PoolOld
Public Class Program
Public Class Counter
Public Property N() As Integer
End Class
Public Class Data
Public Property S() As String
End Class
Public Class Processor
Private d As Data
Public Sub New(d As Data)
Me.d = d
End Sub
Public Sub Run(c As Object)
' simulate a lot of work that takes 0.1 second
Dim s As String = d.S
Thread.Sleep(100)
d.S = s & "X"
' count down
SyncLock c
DirectCast(c, Counter).N -= 1
End SyncLock
End Sub
End Class
Public Shared Sub Test(njobs As Integer, nthreads As Integer)
' setup data
Dim d As Data() = New Data(njobs - 1) {}
For i As Integer = 0 To njobs - 1
d(i) = New Data() With { .S = "X" }
Next
' process
Dim dt1 As DateTime = DateTime.Now
ThreadPool.SetMinThreads(nthreads, nthreads)
ThreadPool.SetMaxThreads(nthreads, nthreads)
' submit jobs
Dim c As New Counter() With { .N = njobs }
For i As Integer = 0 To njobs - 1
ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf (New Processor(d(i))).Run), c)
Next
' wait for jobs to complete
While True
SyncLock c
If c.N <= 0 Then
Exit While
End If
End SyncLock
Thread.Sleep(25)
End While
Dim dt2 As DateTime = DateTime.Now
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, nthreads, (dt2 - dt1).TotalSeconds)
' check data
For i As Integer = 0 To njobs - 1
If d(i).S <> "XX" Then
Console.WriteLine("Ooops")
End If
Next
End Sub
Public Shared Sub Main(args As String())
Test(256, 1)
Test(256, 2)
Test(256, 4)
Test(256, 8)
Test(256, 16)
Test(256, 32)
Test(256, 64)
End Sub
End Class
End Namespace
Example output:
256 jobs executing in 1 threads : 25.6 seconds 256 jobs executing in 2 threads : 15.2 seconds 256 jobs executing in 4 threads : 7.7 seconds 256 jobs executing in 8 threads : 4.1 seconds 256 jobs executing in 16 threads : 2.8 seconds 256 jobs executing in 32 threads : 2.2 seconds 256 jobs executing in 64 threads : 1.8 seconds
.NET 4.0 (2010) added Task Parallel Library (TPL) including a new task scheduler to .NET.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PoolNew
{
public class Program
{
public class Data
{
public string S { get; set; }
}
public class Processor
{
private Data d;
public Processor(Data d)
{
this.d = d;
}
public void Run()
{
// simulate a lot of work that takes 0.1 second
string s = d.S;
Thread.Sleep(100);
d.S = s + "X";
}
}
public static void Test(int njobs)
{
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data { S = "X" };
}
DateTime dt1 = DateTime.Now;
// submit jobs
Task[] t = new Task[njobs];
for(int i = 0; i < njobs; i++)
{
t[i] = Task.Factory.StartNew((new Processor(d[i])).Run);
}
// wait for jobs to complete
for(int i = 0; i < njobs; i++)
{
t[i].Wait();
}
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, "many", (dt2 - dt1).TotalSeconds);
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i].S != "XX") Console.WriteLine("Ooops");
}
}
public static void Main(string[] args)
{
Test(256);
}
}
}
Note that it is non-trivial to control number of threads.
Example output:
256 jobs executing in many threads : 3.1 seconds
.NET 4.0 (2010) added Task Parallel Library (TPL) including a new task scheduler to .NET.
Imports System
Imports System.Threading
Imports System.Threading.Tasks
Namespace PoolNew
Public Class Program
Public Class Data
Public Property S() As String
End Class
Public Class Processor
Private d As Data
Public Sub New(d As Data)
Me.d = d
End Sub
Public Sub Run()
' simulate a lot of work that takes 0.1 second
Dim s As String = d.S
Thread.Sleep(100)
d.S = s & "X"
End Sub
End Class
Public Shared Sub Test(njobs As Integer)
' setup data
Dim d As Data() = New Data(njobs - 1) {}
For i As Integer = 0 To njobs - 1
d(i) = New Data() With { .S = "X" }
Next
Dim dt1 As DateTime = DateTime.Now
' submit jobs
Dim t As Task() = New Task(njobs - 1) {}
For i As Integer = 0 To njobs - 1
t(i) = Task.Factory.StartNew(AddressOf (New Processor(d(i))).Run)
Next
' wait for jobs to complete
For i As Integer = 0 To njobs - 1
t(i).Wait()
Next
Dim dt2 As DateTime = DateTime.Now
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, "many", (dt2 - dt1).TotalSeconds)
' check data
For i As Integer = 0 To njobs - 1
If d(i).S <> "XX" Then
Console.WriteLine("Ooops")
End If
Next
End Sub
Public Shared Sub Main(args As String())
Test(256)
End Sub
End Class
End Namespace
Note that it is non-trivial to control number of threads.
Example output:
256 jobs executing in many threads : 3.1 seconds
Boost instroduced a thread pool in version 1.66 (note that it is in AsIO library not Thread library)..
#include <iostream>
#include <iomanip>
#include <string>
//using namespace std; // std may conflict with boost for newer C++
using std::string;
using std::cout;
using std::endl;
using std::fixed;
using std::setprecision;
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <boost/asio.hpp>
using namespace boost;
using namespace boost::chrono;
using namespace boost::asio;
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
void run(Data *d)
{
// simulate a lot of work that takes 0.1 second
string s = d->GetS();
this_thread::sleep_for(milliseconds(100));
d->setS(s + "X");
}
void test(int njobs, int nthreads)
{
// setup data
DataPtr *d = new DataPtr[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data("X");
}
thread_pool pool(nthreads);
// submit jobs
time_point<system_clock> t1 = system_clock::now();
for(int i = 0; i < njobs; i++)
{
post(pool, bind(run, d[i]));
}
pool.join();
time_point<system_clock> t2 = system_clock::now();
duration<double> dt = t2 - t1;
cout << njobs << " jobs executing in " << nthreads << " threads : " << fixed << setprecision(1) << dt.count() << " seconds" << endl;
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i]->GetS() != "XX") cout << "Ooops" << endl;
delete d[i];
}
delete[] d;
}
int main()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
return 0;
}
Example output:
256 jobs executing in 1 threads : 31.2 seconds 256 jobs executing in 2 threads : 15.6 seconds 256 jobs executing in 4 threads : 7.8 seconds 256 jobs executing in 8 threads : 3.9 seconds 256 jobs executing in 16 threads : 2.0 seconds 256 jobs executing in 32 threads : 1.0 seconds 256 jobs executing in 64 threads : 0.5 seconds
Qt also has thread pool.
#include <iostream>
#include <iomanip>
#include <string>
using namespace std;
#include <QtCore/QThreadPool>
#include <QtCore/QRunnable>
#include <QtCore/QTime>
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
class Processor : public QRunnable
{
private:
Data *d;
protected:
virtual void run();
public:
Processor(Data *d);
};
Processor::Processor(Data *d)
{
this->d = d;
}
void Processor::run()
{
// simulate a lot of work that takes 0.1 second
string s = d->GetS();
QThread::msleep(100);
d->SetS(s + "X");
}
typedef QThread *QThreadPtr;
void test(int njobs, int nthreads)
{
// setup data
DataPtr *d = new DataPtr[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data("X");
}
QTime tim;
tim.start();
QThreadPool pool;
pool.setMaxThreadCount(nthreads);
// submit jobs
for(int i = 0; i < njobs; i++)
{
pool.start(new Processor(d[i]));
}
// wait for jobs to complete
pool.waitForDone();
int dt = tim.elapsed();
cout << njobs << " jobs executing in " << nthreads << " threads : " << fixed << setprecision(1) << (dt / 1000.0) << " seconds" << endl;;
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i]->GetS() != "XX") cout << "Ooops" << endl;
delete d[i];
}
delete[] d;
}
void realmain()
{
test(256, 1);
test(256, 2);
test(256, 4);
test(256, 8);
test(256, 16);
test(256, 32);
test(256, 64);
}
Example output:
256 jobs executing in 1 threads : 28.0 seconds 256 jobs executing in 2 threads : 14.0 seconds 256 jobs executing in 4 threads : 7.0 seconds 256 jobs executing in 8 threads : 3.5 seconds 256 jobs executing in 16 threads : 1.7 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.4 seconds
Python 3.2 (2011) introduced a thread pool that are very heavily inspired by the Java thread pool.
# requires Python >= 3.2
import concurrent.futures
import time
class Data(object):
def __init__(self, _s):
self.s = _s
# non-OO as that is much simpler
def run(arg):
d = arg[0]
# simulate a lot of work that takes 0.1 second
s = d.s
time.sleep(0.1)
d.s = s + "X"
return d
def test(njobs, nthreads):
# setup data
d = []
for i in range(njobs):
d.append(Data("X"))
# process
t1 = time.time()
executor = concurrent.futures.ThreadPoolExecutor(nthreads)
fut = []
for i in range(njobs):
fut.append(executor.submit(run, [d[i]]))
for i in range(njobs):
d[i] = fut[i].result()
t2 = time.time()
print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
# check data
for i in range(njobs):
if d[i].s != "XX":
print("Ooops" + d[i].s)
if __name__ == '__main__': # necesaary as copies are forked
test(256, 1)
test(256, 2)
test(256, 4)
test(256, 8)
test(256, 16)
test(256, 32)
test(256, 64)
Example output:
256 jobs executing in 1 threads : 28.0 seconds 256 jobs executing in 2 threads : 14.0 seconds 256 jobs executing in 4 threads : 7.0 seconds 256 jobs executing in 8 threads : 3.5 seconds 256 jobs executing in 16 threads : 1.7 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.4 seconds
The multi-processing module also have a thread/process pool.
The interface exposed by the pool is actually much nicer than the basic, so one should always use the pool.
import multiprocessing
import time
class Data(object):
def __init__(self, _s):
self.s = _s
# non-OO as that is much simpler
def run(d):
# simulate a lot of work that takes 0.1 second
s = d.s
time.sleep(0.1)
d.s = s + "X"
return d
def test(njobs, nthreads):
# setup data
d = []
for i in range(njobs):
d.append(Data("X"))
# process
t1 = time.time()
pool = multiprocessing.Pool(nthreads)
res = []
for i in range(njobs):
res.append(pool.apply_async(run, [d[i]]))
for i in range(njobs):
d[i] = res[i].get()
t2 = time.time()
print("%d jobs executing in %d threads : %.1f seconds" % (njobs, nthreads, (t2 - t1)))
# check data
for i in range(njobs):
if d[i].s != "XX":
print("Ooops" + d[i].s)
if __name__ == '__main__': # necesaary as copies are forked
test(128, 1)
test(128, 2)
test(128, 4)
test(128, 8)
test(128, 16)
test(128, 32)
test(128, 64)
Example output:
128 jobs executing in 1 threads : 14.1 seconds 128 jobs executing in 2 threads : 7.1 seconds 128 jobs executing in 4 threads : 3.6 seconds 128 jobs executing in 8 threads : 2.0 seconds 128 jobs executing in 16 threads : 1.2 seconds 128 jobs executing in 32 threads : 1.1 seconds 128 jobs executing in 64 threads : 1.8 seconds
The addition of FP (Functional Programming) to some languages has made it possible to do multi-threading in a much simpler way.
Java 8 (2014) added streams and lambdas to Java.
package threads;
import java.util.Arrays;
public class Adv {
public static class Data {
private String s;
public Data(String s) {
this.s = s;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
}
public static class Processor {
private Data d;
public Processor(Data d) {
this.d = d;
}
public void run() {
try {
// simulate a lot of work that takes 0.1 second
String s = d.getS();
Thread.sleep(100);
d.setS(s + 'X');
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void test(boolean par, int njobs) {
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++) {
d[i] = new Data("X");
}
// process
long t1 = System.currentTimeMillis();
if(par) {
Arrays.stream(d).parallel().forEach(o -> (new Processor(o)).run());
} else {
Arrays.stream(d).sequential().forEach(o -> (new Processor(o)).run());
}
long t2 = System.currentTimeMillis();
System.out.printf("%d jobs executing in %s threads : %.1f seconds\n", njobs, par ? "many" : "one", (t2 - t1) / 1000.0);
// check data
for(int i = 0; i < njobs; i++) {
if(!d[i].getS().equals("XX")) System.out.println("Ooops");
}
}
public static void main(String[] args) {
test(false, 256);
test(true, 256);
}
}
Note that it is non-trivial to control number of threads.
Example output:
256 jobs executing in one threads : 25.7 seconds 256 jobs executing in many threads : 3.2 seconds
.NET 3.5 (2008) added LINQ and lambdas to .NET.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace Adv
{
public class Program
{
public class Data
{
public string S { get; set; }
}
public class Processor
{
private Data d;
public Processor(Data d)
{
this.d = d;
}
public void Run()
{
// simulate a lot of work that takes 0.1 second
string s = d.S;
Thread.Sleep(100);
d.S = s + "X";
}
}
public static void Test(bool par, int njobs)
{
// setup data
Data[] d = new Data[njobs];
for(int i = 0; i < njobs; i++)
{
d[i] = new Data { S = "X" };
}
// process
DateTime dt1 = DateTime.Now;
if(par)
{
d.AsParallel().ForAll(o => (new Processor(o)).Run());
}
else
{
d.ToList().ForEach(o => (new Processor(o)).Run());
}
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, par ? "many" : "one", (dt2 - dt1).TotalSeconds);
// check data
for(int i = 0; i < njobs; i++)
{
if(d[i].S != "XX") Console.WriteLine("Ooops");
}
}
public static void Main(string[] args)
{
Test(false, 256);
Test(true, 256);
}
}
}
Note that it is non-trivial to control number of threads.
Example output:
256 jobs executing in one threads : 25.6 seconds 256 jobs executing in many threads : 3.2 seconds
.NET 3.5 (2008) added LINQ and lambdas to .NET.
Imports System
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading
Namespace Adv
Public Class Program
Public Class Data
Public Property S() As String
End Class
Public Class Processor
Private d As Data
Public Sub New(d As Data)
Me.d = d
End Sub
Public Sub Run()
' simulate a lot of work that takes 0.1 second
Dim s As String = d.S
Thread.Sleep(100)
d.S = s & "X"
End Sub
End Class
Public Shared Sub Test(par As Boolean, njobs As Integer)
' setup data
Dim d As Data() = New Data(njobs - 1) {}
For i As Integer = 0 To njobs - 1
d(i) = New Data() With { .S = "X" }
Next
' process
Dim dt1 As DateTime = DateTime.Now
If par Then
d.AsParallel().ForAll(Sub(o As Data) Call (New Processor(o)).Run())
Else
d.ToList().ForEach(Sub(o As Data) Call (New Processor(o)).Run())
End If
Dim dt2 As DateTime = DateTime.Now
Console.WriteLine("{0} jobs executing in {1} threads : {2:F1} seconds", njobs, If(par, "many", "one"), (dt2 - dt1).TotalSeconds)
' check data
For i As Integer = 0 To njobs - 1
If d(i).S <> "XX" Then
Console.WriteLine("Ooops")
End If
Next
End Sub
Public Shared Sub Main(args As String())
Test(False, 256)
Test(True, 256)
End Sub
End Class
End Namespace
Note that it is non-trivial to control number of threads.
Example output:
256 jobs executing in one threads : 25.6 seconds 256 jobs executing in many threads : 3.3 seconds
A totally different approach is to direct the compiler to do the parallelization.
Such an approach is taken by OpenMP supporting Fortran and C.
Fortran example:
program comp
call test(256, 1)
call test(256, 2)
call test(256, 4)
call test(256, 8)
call test(256, 16)
call test(256, 32)
call test(256, 64)
end
subroutine test(njobs, nthreads)
use omp_lib
implicit none
integer(kind=4) njobs, nthreads
integer(kind=4) i, j, k, t1, t2
character(len=2) sa(njobs)
call omp_set_num_threads(nthreads)
call system_clock(t1)
do i = 1, njobs
sa(i) = 'X '
end do
!$omp parallel private(i) shared(sa)
!$omp do
do i = 1, njobs
call run(sa(i))
end do
!$omp end do
!$omp end parallel
do i = 1, njobs
if (sa(i) .ne. 'XX') then
write(*,*) 'Ooops'
end if
end do
call system_clock(t2)
write(*,'(1x,I3,A,I2,A,F4.1,A)') njobs,' jobs executing in ', nthreads, ' threads : ', (t2 - t1) / 1000.0, ' seconds'
end
subroutine run(s)
use iso_c_binding
implicit none
character(len=2) s
call usleep(100000)
s(2:2) = 'X'
end
One simply mark the section of that code that need to be parallized and tell the compiler which variable are to be local to the threads and which variables are to be shared between the threads.
256 jobs executing in 1 threads : 28.0 seconds 256 jobs executing in 2 threads : 14.0 seconds 256 jobs executing in 4 threads : 7.0 seconds 256 jobs executing in 8 threads : 3.5 seconds 256 jobs executing in 16 threads : 1.7 seconds 256 jobs executing in 32 threads : 0.9 seconds 256 jobs executing in 64 threads : 0.4 seconds
There are some potential concurrency problems that may require synchronization.
First there is the traditional concurrent access problem of updates potentially being overwritten:
Thread #1 | Thread #2 |
---|---|
Read variable M | |
Calculate | |
Write variable M | |
Read variable M | |
Calculate | |
Write variable M |
works fine, but:
Thread #1 | Thread #2 |
---|---|
Read variable M | |
Calculate | |
Read variable M | |
Calculate | |
Write variable M | |
Write variable M |
does not produce the expected result.
A second concurrent access problem is non-atomic writes resulting in potentially phantom reads:
Thread #1 | Thread #2 |
---|---|
Write variable M.part1 | |
Write variable M.part2 | |
Read variable M.part1 | |
Read variable M.part2 |
works fine, but:
Thread #1 | Thread #2 |
---|---|
Write variable M.part1 | |
Read variable M.part1 | |
Read variable M.part2 | |
Write variable M.part2 |
produces a weird result.
Note that this problem can happen even with a simple assignment - on some 32 bit architectures assignment to 64 bit variables is not atomic.
The solution is to force thread #2 to wait working on M until thread #1 is done working on M - to synchronize the threads.
A third problem is much more subtle. It is the change visibility problem.
We think of a multi-threaded application running as:
But it really runs as:
To avoid problems it is necessary to synchronize CPU specific caches with memory.
Most programming languages / frameworks does that when:
(most is not all so remember to check the specifics for what you use)
So the classic construct:
does not work unless the stop variable is declared volatile or synchronization is used.
A fourth problem is that some modern CPU's reorder memory reads and/or writes behind the scene as part of optimization. This can create inconsistencies where one thread first write to M1 and then to M2, but the CPU reorders so that another thread can first read the new value from M2 and then the old value from M1.
The solution is do what is called a memory barrier. The same 3 items that typical handle the visbility problem typical also handle the reordering problem.
Let us first see some code that is not working correctly:
package threads;
public class NotCorrect {
public static class Data {
private String s;
public Data(String s) {
this.s = s;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
}
public static class Processor extends Thread {
private Data d;
public Processor(Data d) {
this.d = d;
}
@Override
public void run() {
// simulate some work between get and set
String s = d.getS();
Thread.yield();
d.setS(s + 'X');
}
}
public static void test(int njobs, int nthreads) {
// setup data
Data d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++) {
// create threads
Thread[] t = new Thread[nthreads];
for(int j = 0; j < nthreads; j++) {
t[j] = new Processor(d);
}
// start threads
for(int j = 0; j < nthreads; j++) {
t[j].start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++) {
try {
t[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, d.getS().length());
}
public static void main(String[] args) {
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
}
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2558 4 threads : expected = 2560, actual = 2506 8 threads : expected = 2560, actual = 2462 16 threads : expected = 2560, actual = 2450 32 threads : expected = 2560, actual = 2444 64 threads : expected = 2560, actual = 2403
using System;
using System.Threading;
namespace NotCorrect
{
public class Program
{
public class Data
{
public string S { get; set; }
}
public class Processor
{
private Data d;
public Processor(Data d)
{
this.d = d;
}
public void Run()
{
// simulate some work between get and set
string s = d.S;
Thread.Yield();
d.S = s + "X";
}
}
public static void Test(int njobs, int nthreads)
{
// setup data
Data d = new Data { S = "" };
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create threads
Thread[] t = new Thread[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new Thread((new Processor(d)).Run);
}
// start threads
for(int j = 0; j < nthreads; j++)
{
t[j].Start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j].Join();
}
}
Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length);
}
public static void Main(string[] args)
{
Test(2560, 1);
Test(2560, 2);
Test(2560, 4);
Test(2560, 8);
Test(2560, 16);
Test(2560, 32);
Test(2560, 64);
Console.ReadKey();
}
}
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2544 8 threads : expected = 2560, actual = 2535 16 threads : expected = 2560, actual = 2525 32 threads : expected = 2560, actual = 2509 64 threads : expected = 2560, actual = 2514
Imports System
Imports System.Threading
Namespace NotCorrect
Public Class Program
Public Class Data
Public Property S() As String
End Class
Public Class Processor
Private d As Data
Public Sub New(d As Data)
Me.d = d
End Sub
Public Sub Run()
' simulate some work between get and set
Dim s As String = d.S
Thread.Yield()
d.S = s & "X"
End Sub
End Class
Public Shared Sub Test(njobs As Integer, nthreads As Integer)
' setup data
Dim d As New Data() With { .S = "" }
' process with shared data
For i As Integer = 0 To njobs \ nthreads - 1
' create threads
Dim t As Thread() = New Thread(nthreads - 1) {}
For j As Integer = 0 To nthreads - 1
t(j) = New Thread(AddressOf (New Processor(d)).Run)
Next
' start threads
For j As Integer = 0 To nthreads - 1
t(j).Start()
Next
' wait for threads to complete
For j As Integer = 0 To nthreads - 1
t(j).Join()
Next
Next
Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length)
End Sub
Public Shared Sub Main(args As String())
Test(2560, 1)
Test(2560, 2)
Test(2560, 4)
Test(2560, 8)
Test(2560, 16)
Test(2560, 32)
Test(2560, 64)
Console.ReadKey()
End Sub
End Class
End Namespace
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2558 8 threads : expected = 2560, actual = 2538 16 threads : expected = 2560, actual = 2534 32 threads : expected = 2560, actual = 2525 64 threads : expected = 2560, actual = 2492
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <windows.h>
#include <process.h>
#define MAX_STR_LEN 8192
struct data {
char s[MAX_STR_LEN];
};
unsigned int __stdcall run(void *p)
{
struct data *d;
char s[MAX_STR_LEN];
d = p;
strcpy(s, d->s);
Sleep(0);
strcpy(d->s, s);
strcat(d->s, "X");
return 0;
}
void test(int njobs, int nthreads)
{
struct data d;
HANDLE *t;
int i, j;
/* setup data */
strcpy(d.s, "");
/* process with shared data */
for(i = 0; i < njobs / nthreads; i++)
{
/* create and start threads */
t = malloc(nthreads * sizeof(HANDLE));
for(j = 0; j < nthreads; j++)
{
t[j] = (HANDLE)_beginthreadex(NULL, 0, run, &d, 0, NULL);
}
/* wait for threads to complete */
for(j = 0; j < nthreads; j++)
{
WaitForSingleObject(t[j], INFINITE);
CloseHandle(t[j]);
}
free(t);
}
printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2557 8 threads : expected = 2560, actual = 2556 16 threads : expected = 2560, actual = 2550 32 threads : expected = 2560, actual = 2526 64 threads : expected = 2560, actual = 2548
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#define MAX_STR_LEN 8192
struct data {
char s[MAX_STR_LEN];
};
void *run(void *p)
{
struct data *d;
char s[MAX_STR_LEN];
d = p;
strcpy(s, d->s);
usleep(1);
strcpy(d->s, s);
strcat(d->s, "X");
return NULL;
}
void test(int njobs, int nthreads)
{
struct data d;
pthread_t *t;
int i, j;
void *p;
/* setup data */
strcpy(d.s, "");
/* process with shared data */
for(i = 0; i < njobs / nthreads; i++)
{
/* create and start threads */
t = malloc(nthreads * sizeof(pthread_t));
for(j = 0; j < nthreads; j++)
{
pthread_create(&t[j], NULL, run, &d);
}
/* wait for threads to complete */
for(j = 0; j < nthreads; j++)
{
pthread_join(t[j], &p);
}
free(t);
}
printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 1378 4 threads : expected = 2560, actual = 670 8 threads : expected = 2560, actual = 326 16 threads : expected = 2560, actual = 162 32 threads : expected = 2560, actual = 81 64 threads : expected = 2560, actual = 40
#include <iostream>
#include <iomanip>
#include <string>
//using namespace std; // std may conflict with boost for newer C++
using std::string;
using std::cout;
using std::endl;
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
using namespace boost;
using namespace boost::chrono;
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
void run(Data *d)
{
// simulate some work between get and set
string s = d->GetS();
this_thread::sleep_for(microseconds(1));
d->setS(s + "X");
}
typedef thread *thread_ptr;
void test(int njobs, int nthreads)
{
// setup data
Data *d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create and start threads
thread_ptr *t = new thread_ptr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new thread(run, d);
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->join();
delete t[j];
}
delete[] t;
}
cout << nthreads << " threads : expected = " << njobs << ", actual = " << d->GetS().length() << endl;;
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 (crashed)
#include <iostream>
#include <iomanip>
#include <string>
#include <thread>
#include <chrono>
using namespace std;
using namespace std::chrono;
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
void run(Data *d)
{
// simulate some work between get and set
string s = d->GetS();
this_thread::sleep_for(microseconds(1));
d->setS(s + "X");
}
typedef thread *thread_ptr;
void test(int njobs, int nthreads)
{
// setup data
Data *d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create and start threads
thread_ptr *t = new thread_ptr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new thread(run, d);
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->join();
delete t[j];
}
delete[] t;
}
cout << nthreads << " threads : expected = " << njobs << ", actual = " << d->GetS().length() << endl;;
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 (crashed)
#include <iostream>
#include <iomanip>
#include <string>
using namespace std;
#include <QtCore/QThread>
#include <QtCore/QTime>
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
class Processor : public QThread
{
private:
Data *d;
protected:
virtual void run();
public:
Processor(Data *d);
};
Processor::Processor(Data *d)
{
this->d = d;
}
void Processor::run()
{
// simulate some work between get and set
string s = d->GetS();
QThread::usleep(1);
d->SetS(s + "X");
}
typedef QThread *QThreadPtr;
void test(int njobs, int nthreads)
{
// setup data
Data *d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create threads
QThreadPtr *t = new QThreadPtr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new Processor(d);
}
// start threads
for(int j = 0; j < nthreads; j++)
{
t[j]->start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->wait();
delete t[j];
}
delete[] t;
}
cout << nthreads << " threads : expected = " << njobs << ", actual = " << d->GetS().length() << endl;;
delete d;
}
void realmain()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 1280 4 threads : expected = 2560, actual = 640 (crashed)
program NotCorrect;
uses
Classes, SysUtils, Windows;
type
Data = class(TObject)
constructor Create(s : string);
function GetS : string;
procedure SetS(s : string);
property S : string read GetS write SetS;
private
_s : string;
end;
Processor = class(TThread)
constructor Create(d : Data);
function GetD : Data;
property D : Data read GetD;
protected
procedure Execute; override;
private
_d : Data;
end;
constructor Data.Create(s : string);
begin
_s := s;
end;
function Data.GetS : string;
begin
GetS := _s;
end;
procedure Data.SetS(s : string);
begin
_s := s;
end;
constructor Processor.Create(d : Data);
begin
inherited Create(true);
_d := d;
end;
function Processor.GetD : Data;
begin
GetD := _d;
end;
procedure Processor.Execute;
var
s : string;
begin
(* simulate some work between get and set *)
s := D.S;
Yield;
D.S := s + 'X';
end;
procedure Test(njobs, nthreads : integer);
var
d : Data;
t : array of TThread;
i, j : integer;
begin
(* setup data*)
d := Data.Create('');
(* process with sharted data *)
for i := 0 to (njobs div nthreads - 1) do begin
SetLength(t, nthreads);
(* create threads *)
for j := 0 to nthreads-1 do begin
t[j] := Processor.Create(d);
end;
(* start threads *)
for j := 0 to nthreads-1 do begin
t[j].Start;
end;
(* wait for threads to complete *)
for j := 0 to nthreads-1 do begin
t[j].WaitFor;
t[j].Terminate;
t[j].Free;
end;
end;
writeln(nthreads:1,' threads : expected = ',njobs:1,', actual = ', length(d.S):1);
end;
begin
Test(2560, 1);
Test(2560, 2);
Test(2560, 4);
Test(2560, 8);
Test(2560, 16);
Test(2560, 32);
Test(2560, 64);
end.
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2559 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2332
import threading
import time
class Data(object):
def __init__(self, _s):
self.s = _s
class Processor(threading.Thread):
def __init__(self, _d):
super(Processor, self).__init__()
self.d = _d
def run(self):
# simulate some work between get and set
s = self.d.s
time.sleep(0.001)
self.d.s = s + "X"
def test(njobs, nthreads):
# setup data
d = Data("")
# process
for i in range(njobs / nthreads): # // in Python 3.x
# create threads
t = []
for j in range(nthreads):
t.append(Processor(d))
# start threads
for j in range(nthreads):
t[j].start()
# wait for threads to complete
for j in range(nthreads):
t[j].join()
print("%d threads : expected = %d, actual = %d" % (nthreads, njobs, len(d.s)))
test(2560, 1)
test(2560, 2)
test(2560, 4)
test(2560, 8)
test(2560, 16)
test(2560, 32)
test(2560, 64)
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 1280 4 threads : expected = 2560, actual = 640 8 threads : expected = 2560, actual = 320 16 threads : expected = 2560, actual = 160 32 threads : expected = 2560, actual = 80 64 threads : expected = 2560, actual = 40
(CPython - Jython results are more random like)
program notcorrect
call test(2560, 1)
call test(2560, 2)
call test(2560, 4)
call test(2560, 8)
call test(2560, 16)
call test(2560, 32)
call test(2560, 64)
end
subroutine test(njobs, nthreads)
use omp_lib
implicit none
integer(kind=4) njobs, nthreads
integer(kind=4) i, j, k, t1, t2, slen, c
character(len=32768) s
call omp_set_num_threads(nthreads)
call system_clock(t1)
slen = 0
!$omp parallel private(i) shared(s, slen)
!$omp do
do i = 1, njobs
call run(s, slen)
end do
!$omp end do
!$omp end parallel
c = 0
do i = 1, njobs
if (s(i:i) .eq. 'X') then
c = c + 1
end if
end do
call system_clock(t2)
write(*,'(1x,I2,A,I4,A,I4)') nthreads,' threads : expected = ', njobs, ' actual = ', c
end
subroutine run(s, slen)
use iso_c_binding
implicit none
character(len=32768) s
integer(kind=4) slen
character(len=32768) temp
temp(1:slen) = s(1:slen)
call usleep(100)
slen = slen + 1
temp(slen:slen) = 'X'
s(1:slen) = temp(1:slen)
end
Example output:
1 threads : expected = 2560 actual = 2560 2 threads : expected = 2560 actual = 1642 4 threads : expected = 2560 actual = 1319 8 threads : expected = 2560 actual = 843 16 threads : expected = 2560 actual = 733 32 threads : expected = 2560 actual = 771 64 threads : expected = 2560 actual = 771
The above code is only guaranteed to work for 1 thread. It is not guaranteed to work with more than 1 thread. That does not mean that it is guaranteed to fail. It may accidentally work fine some time or even most times. But it can fail.
Also note that it can fail in different ways. It can complete and produce a wrong result. But it can also crash the program.
And experience shows that non-correct multi-threaded code often works fine in test and first start showing errors in production.
And now let us see the same code with proper synchronization in place to guarantee that the code runs correct:
package threads;
public class Correct {
public static class Data {
private String s;
public Data(String s) {
this.s = s;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
}
public static class Processor extends Thread {
private Data d;
public Processor(Data d) {
this.d = d;
}
@Override
public void run() {
// simulate some work between get and set
synchronized(d) {
String s = d.getS();
Thread.yield();
d.setS(s + 'X');
}
}
}
public static void test(int njobs, int nthreads) {
// setup data
Data d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++) {
// create threads
Thread[] t = new Thread[nthreads];
for(int j = 0; j < nthreads; j++) {
t[j] = new Processor(d);
}
// start threads
for(int j = 0; j < nthreads; j++) {
t[j].start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++) {
try {
t[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, d.getS().length());
}
public static void main(String[] args) {
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
}
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
using System;
using System.Threading;
namespace Correct
{
public class Program
{
public class Data
{
public string S { get; set; }
}
public class Processor
{
private Data d;
public Processor(Data d)
{
this.d = d;
}
public void Run()
{
// simulate some work between get and set
lock(d)
{
string s = d.S;
Thread.Yield();
d.S = s + "X";
}
}
}
public static void Test(int njobs, int nthreads)
{
// setup data
Data d = new Data { S = "" };
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create threads
Thread[] t = new Thread[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new Thread((new Processor(d)).Run);
}
// start threads
for(int j = 0; j < nthreads; j++)
{
t[j].Start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j].Join();
}
}
Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length);
}
public static void Main(string[] args)
{
Test(2560, 1);
Test(2560, 2);
Test(2560, 4);
Test(2560, 8);
Test(2560, 16);
Test(2560, 32);
Test(2560, 64);
Console.ReadKey();
}
}
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
Imports System
Imports System.Threading
Namespace Correct
Public Class Program
Public Class Data
Public Property S() As String
End Class
Public Class Processor
Private d As Data
Public Sub New(d As Data)
Me.d = d
End Sub
Public Sub Run()
' simulate some work between get and set
SyncLock d
Dim s As String = d.S
Thread.Yield()
d.S = s & "X"
End SyncLock
End Sub
End Class
Public Shared Sub Test(njobs As Integer, nthreads As Integer)
' setup data
Dim d As New Data() With { .S = "" }
' process with shared data
For i As Integer = 0 To njobs \ nthreads - 1
' create threads
Dim t As Thread() = New Thread(nthreads - 1) {}
For j As Integer = 0 To nthreads - 1
t(j) = New Thread(AddressOf (New Processor(d)).Run)
Next
' start threads
For j As Integer = 0 To nthreads - 1
t(j).Start()
Next
' wait for threads to complete
For j As Integer = 0 To nthreads - 1
t(j).Join()
Next
Next
Console.WriteLine("{0} threads : expected = {1}, actual = {2}", nthreads, njobs, d.S.Length)
End Sub
Public Shared Sub Main(args As String())
Test(2560, 1)
Test(2560, 2)
Test(2560, 4)
Test(2560, 8)
Test(2560, 16)
Test(2560, 32)
Test(2560, 64)
Console.ReadKey()
End Sub
End Class
End Namespace
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <windows.h>
#include <process.h>
#define MAX_STR_LEN 8192
struct data {
char s[MAX_STR_LEN];
};
static CRITICAL_SECTION cs;
unsigned int __stdcall run(void *p)
{
struct data *d;
char s[MAX_STR_LEN];
d = p;
EnterCriticalSection(&cs);
strcpy(s, d->s);
Sleep(0);
strcpy(d->s, s);
strcat(d->s, "X");
LeaveCriticalSection(&cs);
return 0;
}
void test(int njobs, int nthreads)
{
struct data d;
HANDLE *t;
int i, j;
/* setup data */
strcpy(d.s, "");
/* process with shared data */
InitializeCriticalSection(&cs);
for(i = 0; i < njobs / nthreads; i++)
{
/* create and start threads */
t = malloc(nthreads * sizeof(HANDLE));
for(j = 0; j < nthreads; j++)
{
t[j] = (HANDLE)_beginthreadex(NULL, 0, run, &d, 0, NULL);
}
/* wait for threads to complete */
for(j = 0; j < nthreads; j++)
{
WaitForSingleObject(t[j], INFINITE);
CloseHandle(t[j]);
}
free(t);
}
DeleteCriticalSection(&cs);
printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#define MAX_STR_LEN 8192
struct data {
char s[MAX_STR_LEN];
};
static pthread_mutex_t mtx;
void *run(void *p)
{
struct data *d;
char s[MAX_STR_LEN];
d = p;
pthread_mutex_lock(&mtx);
strcpy(s, d->s);
usleep(1);
strcpy(d->s, s);
strcat(d->s, "X");
pthread_mutex_unlock(&mtx);
return NULL;
}
void test(int njobs, int nthreads)
{
struct data d;
pthread_t *t;
int i, j;
void *p;
pthread_mutex_init(&mtx, NULL);
/* setup data */
strcpy(d.s, "");
/* process with shared data */
for(i = 0; i < njobs / nthreads; i++)
{
/* create and start threads */
t = malloc(nthreads * sizeof(pthread_t));
for(j = 0; j < nthreads; j++)
{
pthread_create(&t[j], NULL, run, &d);
}
/* wait for threads to complete */
for(j = 0; j < nthreads; j++)
{
pthread_join(t[j], &p);
}
free(t);
}
printf("%d threads : expected = %d, actual = %d\n", nthreads, njobs, (int)strlen(d.s));
pthread_mutex_destroy(&mtx);
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
#include <iostream>
#include <iomanip>
#include <string>
//using namespace std; // std may conflict with boost for newer C++
using std::string;
using std::cout;
using std::endl;
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
using namespace boost;
using namespace boost::chrono;
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
mutex mtx;
void run(Data *d)
{
// simulate some work between get and set
mtx.lock();
string s = d->GetS();
this_thread::sleep_for(microseconds(1));
d->setS(s + "X");
mtx.unlock();
}
typedef thread *thread_ptr;
void test(int njobs, int nthreads)
{
// setup data
Data *d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create and start threads
thread_ptr *t = new thread_ptr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new thread(run, d);
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->join();
delete t[j];
}
delete[] t;
}
cout << nthreads << " threads : expected = " << njobs << ", actual = " << d->GetS().length() << endl;;
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
#include <iostream>
#include <iomanip>
#include <string>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;
using namespace std::chrono;
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
mutex mtx;
void run(Data *d)
{
// simulate some work between get and set
mtx.lock();
string s = d->GetS();
this_thread::sleep_for(microseconds(1));
d->setS(s + "X");
mtx.unlock();
}
typedef thread *thread_ptr;
void test(int njobs, int nthreads)
{
// setup data
Data *d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create and start threads
thread_ptr *t = new thread_ptr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new thread(run, d);
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->join();
delete t[j];
}
delete[] t;
}
cout << nthreads << " threads : expected = " << njobs << ", actual = " << d->GetS().length() << endl;;
}
int main()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
return 0;
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
#include <iostream>
#include <iomanip>
#include <string>
using namespace std;
#include <QtCore/QThread>
#include <QtCore/QMutex>
#include <QtCore/QTime>
class Data
{
private:
string s;
public:
Data(string s);
string GetS();
void SetS(string s);
};
Data::Data(string s)
{
this->s = s;
}
string Data::GetS()
{
return s;
}
void Data::SetS(string s)
{
this->s = s;
}
typedef Data *DataPtr;
class Processor : public QThread
{
private:
Data *d;
protected:
virtual void run();
public:
Processor(Data *d);
};
QMutex mtx;
Processor::Processor(Data *d)
{
this->d = d;
}
void Processor::run()
{
// simulate some work between get and set
mtx.lock();
string s = d->GetS();
QThread::usleep(1);
d->SetS(s + "X");
mtx.unlock();
}
typedef QThread *QThreadPtr;
void test(int njobs, int nthreads)
{
// setup data
Data *d = new Data("");
// process with shared data
for(int i = 0; i < njobs / nthreads; i++)
{
// create threads
QThreadPtr *t = new QThreadPtr[nthreads];
for(int j = 0; j < nthreads; j++)
{
t[j] = new Processor(d);
}
// start threads
for(int j = 0; j < nthreads; j++)
{
t[j]->start();
}
// wait for threads to complete
for(int j = 0; j < nthreads; j++)
{
t[j]->wait();
delete t[j];
}
delete[] t;
}
cout << nthreads << " threads : expected = " << njobs << ", actual = " << d->GetS().length() << endl;;
delete d;
}
void realmain()
{
test(2560, 1);
test(2560, 2);
test(2560, 4);
test(2560, 8);
test(2560, 16);
test(2560, 32);
test(2560, 64);
}
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
program Correct;
uses
Classes, SysUtils, Windows;
var
cs : TRTLCriticalSection;
type
Data = class(TObject)
constructor Create(s : string);
function GetS : string;
procedure SetS(s : string);
property S : string read GetS write SetS;
private
_s : string;
end;
Processor = class(TThread)
constructor Create(d : Data);
function GetD : Data;
property D : Data read GetD;
protected
procedure Execute; override;
private
_d : Data;
end;
constructor Data.Create(s : string);
begin
_s := s;
end;
function Data.GetS : string;
begin
GetS := _s;
end;
procedure Data.SetS(s : string);
begin
_s := s;
end;
constructor Processor.Create(d : Data);
begin
inherited Create(true);
_d := d;
end;
function Processor.GetD : Data;
begin
GetD := _d;
end;
procedure Processor.Execute;
var
s : string;
begin
(* simulate some work between get and set *)
EnterCriticalSection(cs);
s := D.S;
Yield;
D.S := s + 'X';
LeaveCriticalSection(cs);
end;
procedure Test(njobs, nthreads : integer);
var
d : Data;
t : array of TThread;
i, j : integer;
begin
InitCriticalSection(cs);
(* setup data*)
d := Data.Create('');
(* process with sharted data *)
for i := 0 to (njobs div nthreads - 1) do begin
SetLength(t, nthreads);
(* create threads *)
for j := 0 to nthreads-1 do begin
t[j] := Processor.Create(d);
end;
(* start threads *)
for j := 0 to nthreads-1 do begin
t[j].Start;
end;
(* wait for threads to complete *)
for j := 0 to nthreads-1 do begin
t[j].WaitFor;
t[j].Terminate;
t[j].Free;
end;
end;
writeln(nthreads:1,' threads : expected = ',njobs:1,', actual = ', length(d.S):1);
DeleteCriticalSection(cs);
end;
begin
Test(2560, 1);
Test(2560, 2);
Test(2560, 4);
Test(2560, 8);
Test(2560, 16);
Test(2560, 32);
Test(2560, 64);
end.
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
import threading
import time
class Data(object):
def __init__(self, _s):
self.s = _s
lck = threading.RLock()
class Processor(threading.Thread):
def __init__(self, _d):
super(Processor, self).__init__()
self.d = _d
def run(self):
# simulate some work between get and set
lck.acquire(1)
s = self.d.s
time.sleep(0.001)
self.d.s = s + "X"
lck.release()
def test(njobs, nthreads):
# setup data
d = Data("")
# process
for i in range(njobs / nthreads): # // in Python 3.x
# create threads
t = []
for j in range(nthreads):
t.append(Processor(d))
# start threads
for j in range(nthreads):
t[j].start()
# wait for threads to complete
for j in range(nthreads):
t[j].join()
print("%d threads : expected = %d, actual = %d" % (nthreads, njobs, len(d.s)))
test(2560, 1)
test(2560, 2)
test(2560, 4)
test(2560, 8)
test(2560, 16)
test(2560, 32)
test(2560, 64)
Example output:
1 threads : expected = 2560, actual = 2560 2 threads : expected = 2560, actual = 2560 4 threads : expected = 2560, actual = 2560 8 threads : expected = 2560, actual = 2560 16 threads : expected = 2560, actual = 2560 32 threads : expected = 2560, actual = 2560 64 threads : expected = 2560, actual = 2560
program correct
call test(2560, 1)
call test(2560, 2)
call test(2560, 4)
call test(2560, 8)
call test(2560, 16)
call test(2560, 32)
call test(2560, 64)
end
subroutine test(njobs, nthreads)
use omp_lib
implicit none
integer(kind=4) njobs, nthreads
integer(kind=4) i, j, k, t1, t2, slen, c
character(len=32768) s
call omp_set_num_threads(nthreads)
call system_clock(t1)
slen = 0
!$omp parallel private(i) shared(s, slen)
!$omp do
do i = 1, njobs
call run(s, slen)
end do
!$omp end do
!$omp end parallel
c = 0
do i = 1, njobs
if (s(i:i) .eq. 'X') then
c = c + 1
end if
end do
call system_clock(t2)
write(*,'(1x,I2,A,I4,A,I4)') nthreads,' threads : expected = ', njobs, ' actual = ', c
end
subroutine run(s, slen)
use omp_lib
use iso_c_binding
implicit none
character(len=32768) s
integer(kind=4) slen
character(len=32768) temp
!$omp sections
!$omp section
!$omp critical
temp(1:slen) = s(1:slen)
call usleep(100)
slen = slen + 1
temp(slen:slen) = 'X'
s(1:slen) = temp(1:slen)
!$omp end critical
!$omp end sections
end
Example output:
1 threads : expected = 2560 actual = 2560 2 threads : expected = 2560 actual = 2560 4 threads : expected = 2560 actual = 2560 8 threads : expected = 2560 actual = 2560 16 threads : expected = 2560 actual = 2560 32 threads : expected = 2560 actual = 2560 64 threads : expected = 2560 actual = 2560
It is common for desktop GUI frameworks to have all GUI interactions running in an event thread.
This means that any atempts to do something long running directly as a GUI action will hang the GUI. Instead a new thread should be started to do the long running task - that will keep the GUI responsive.
For standalone server applications it is up to the developer to manage threads, but for applications running within a server framework, then it is common for the framework to handle threads.
Java web container (Tomcat, Jetty etc.):
Full Java EE application server (JBoss, WebSphere, WebLogic etc.):
ASP.NET:
Apache 2.x, mod_php and PHP:
Version | Date | Description |
---|---|---|
1.0 | August 31st 2018 | Initial version based on old article (in Danish) on Eksperten.dk |
1.1 | December 23rd 2018 | Add Python |
1.2 | November 23rd 2019 | Add C++ Boost and C++ 11 |
1.3 | November 27th 2019 | Add C++ Qt |
1.4 | July 27th 2021 | Add Fortran OpenMP |
See list of all articles here
Please send comments to Arne Vajhøj