Actor

Content:

  1. Introduction
  2. Concept
  3. Implementations
  4. Basic example
  5. Threading
  6. Conclusion

Introduction:

The actor model is an old model for concurrent programming. The theory goes back to the 1970's and 1980's.

It has a very strong theoretical/mathematical background.

It got a practical boost when Erlang showed up.

Later the paradigm has beed adopted by many modern languages. Typical though as a library not as a language builtin.

One of the most well known actor frameworks is Akka (JVM) / Akka.NET (.NET).

This article will do a brief introduction to actors.

Concept:

An actor is an object that only communicate by receiving messages via some sort of pipe.

A model like:

Actor concept

The actor processes messages like:

loop {
    msg = receive_message()
    switch(msg) {
        value_A => ...
        value_B => ...
        type_X => ...
        type_Y => ...
    }
}

Callers work like:

actor = get_actor_ref()
actor.tell(msg)

(besides the tell method the API usually also have an ask method for waiting on response from actor)

Actors are besides internal computation only allowed to:

A given actor instance is only processing one message at a time.

So by using actors all the usual potential concurrency problems are eliminated.

Implementations:

Different languages/technologies comes with different Actor frameworks.

Among the most common are:

Actor framework Languages
Erlang builtin Erlang
Scala builtin Scala (older versions)
Akka Java
Scala (newer versions)
Pekko Java
Scala (newer versions)
Akka.NET C#
F#
Pykka Python
CZMQ C
CAF C++

Apache Pekko is a clone of Akka that was created after the license for Akka was changed.

All the Akka code below will work with Pekko if changing imports from akka.something to org.apache.pekko.something (tested with Akka 2.8.3 and Pekko 1.0.2).

Basic example:

As an example of the most basic stuff we will use a model like:

Actor basic example
package actor.scala

import scala.actors.Actor

object Test2 extends Actor {
  object Command extends Enumeration {
    type Command  = Value
    val DONE = Value
  }
  case class Message(val text: String)
  def act() {
    while(true) {
      receive {
        case Command.DONE => exit()
        case msg: Message => {
          println(s"Test2: ${msg.text} (thread: ${Thread.currentThread().getId()})")
          Test1 ! Test1.Command.ACK
        }
      }
    }
  }
}

object Test1 extends Actor {
  object Command extends Enumeration {
    type Command  = Value
    val ACK, DONE = Value
  }
  case class Message(val text: String)
  def act() {
    Test2.start()
    while(true) {
      receive {
        case Command.ACK => println(s"Test1: ACK from Test2 (thread: ${Thread.currentThread().getId()})")
        case Command.DONE => {
          Test2 ! Test2.Command.DONE
          exit()
        }
        case msg: Message => {
          println(s"Test1: ${msg.text} (thread: ${Thread.currentThread().getId()})")
          Test2 ! new Test2.Message(msg.text)
        }
      }
    }
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    println(s"main (thread: ${Thread.currentThread().getId()})")
    Test1.start()
    Test1 ! new Test1.Message("Hello world!")
    Test1 ! new Test1.Message("Hello world!")
    Test1 ! new Test1.Message("Hello world!")
    Thread.sleep(100)
    Test1 ! Test1.Command.DONE
  }
}
package actor.akka.classic.untyped

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.PoisonPill
import akka.actor.Props

object Test2Command extends Enumeration {
  type Command  = Value
  val DONE = Value
}
case class Test2Message(val text: String)

class Test2 extends Actor {
  def receive = {
    case Test2Command.DONE => self ! PoisonPill
    case msg: Test2Message => {
      println(s"Test2: ${msg.text} (thread: ${Thread.currentThread().getId()})")
      sender() ! Test1Command.ACK
    }
    case _ => println("Unknown message")
  }
}

object Test1Command extends Enumeration {
  type Command  = Value
  val ACK, DONE = Value
}
case class Test1Message(val text: String)

class Test1 extends Actor {
  val test2 = context.actorOf(Props[Test2]())
  def receive = {
    case Test1Command.ACK => println(s"Test1: ACK from Test2 (thread: ${Thread.currentThread().getId()})")
    case Test1Command.DONE => {
      test2 ! Test2Command.DONE
      self ! PoisonPill
    }
    case msg: Test1Message => {
      println(s"Test1: ${msg.text} (thread: ${Thread.currentThread().getId()})")
      test2 ! new Test2Message(msg.text)
    }
    case _ => println("Unknown message")
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    println(s"main (thread: ${Thread.currentThread().getId()})")
    val akka = ActorSystem.create("Demo")
    val test1 = akka.actorOf(Props[Test1]())
    test1 ! new Test1Message("Hello world!")
    test1 ! new Test1Message("Hello world!")
    test1 ! new Test1Message("Hello world!")
    Thread.sleep(100)
    test1 ! Test1Command.DONE
    akka.terminate()
  }
}
package actor.akka.classic.typed

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.TypedActor
import akka.actor.TypedProps

trait Test2 {
  def done(): Unit
  def message(msg: String, sender: Test1): Unit
}

class Test2Impl extends Test2 {
  def done(): Unit = {
    TypedActor.get(TypedActor.context).stop(this)
  }
  def message(msg: String, sender: Test1): Unit = {
    println(s"Test2: ${msg} (thread: ${Thread.currentThread().getId()})")
    sender.ack()
  }
}

trait Test1 {
  def ack(): Unit
  def done(): Unit
  def message(msg: String): Unit
}

class Test1Impl() extends Test1 {
  val test2: Test2 = TypedActor(TypedActor.context).typedActorOf(TypedProps[Test2Impl]())
  def ack(): Unit = {
    println(s"Test1: ACK from Test2 (thread: ${Thread.currentThread().getId()})")
  }
  def done(): Unit = {
    test2.done()
    TypedActor.get(TypedActor.context).stop(this)
  }
  def message(msg: String): Unit = {
    println(s"Test1: ${msg} (thread: ${Thread.currentThread().getId()})")
    test2.message(msg, TypedActor.self)
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    println(s"main (thread: ${Thread.currentThread().getId()})")
    val akka = ActorSystem.create("Demo")
    val test1: Test1 = TypedActor(akka).typedActorOf(TypedProps[Test1Impl]())
    test1.message("Hello world!")
    test1.message("Hello world!")
    test1.message("Hello world!")
    Thread.sleep(100)
    test1.done()
    akka.terminate()
  }
}
package actor.akka.typed.tell

import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

object Test2 {
  val DONE = "__DONE__"
  case class Message(val text: String, val sender: ActorRef[Test1.Message])
  def apply(): Behavior[Message] = Behaviors.receive { (ctx, msg) =>
    msg.text match {
      case DONE => Behaviors.stopped
      case _ => {
        println(s"Test2: ${msg.text} (thread: ${Thread.currentThread().getId()})")
        msg.sender ! Test1.Message(Test1.ACK)
        Behaviors.same
      }
    }
  }
}

object Test1 {
  val ACK = "__ACK__"
  val DONE = "__DONE__"
  case class Message(val text: String)
  def apply(): Behavior[Message] = Behaviors.setup { ctx =>
      val test2 = ctx.spawn(Test2(), "Test2")
      Behaviors.receive { (ctx, msg) =>
        msg.text match {
          case ACK => {
            println(s"Test1: ACK from Test2 (thread: ${Thread.currentThread().getId()})")
            Behaviors.same
          }
          case DONE => {
            test2 ! Test2.Message(Test2.DONE, ctx.self)
            Behaviors.stopped
          }
          case _ => {
            println(s"Test1: ${msg.text} (thread: ${Thread.currentThread().getId()})")
            test2 ! new Test2.Message(msg.text, ctx.self)
            Behaviors.same
        }
      }
    }
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    println(s"main (thread: ${Thread.currentThread().getId()})")
    val test1 = ActorSystem.create(Test1(), "Test1")
    test1 ! new Test1.Message("Hello world!")
    test1 ! new Test1.Message("Hello world!")
    test1 ! new Test1.Message("Hello world!")
    Thread.sleep(100)
    test1 ! new Test1.Message(Test1.DONE)
    test1.terminate()
  }
}
package actor.akka.typed.ask

import scala.util.Failure
import scala.util.Success
import scala.concurrent.duration._

import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.util.Timeout

object Test2 {
  val DONE = "__DONE__"
  case class Message(val text: String, val sender: ActorRef[Test1.Message])
  def apply(): Behavior[Message] = Behaviors.receive { (ctx, msg) =>
    msg.text match {
      case DONE => Behaviors.stopped
      case _ => {
        println(s"Test2: ${msg.text} (thread: ${Thread.currentThread().getId()})")
        msg.sender ! Test1.Message(Test1.ACK)
        Behaviors.same
      }
    }
  }
}

object Test1 {
  val ACK = "__ACK__"
  val DONE = "__DONE__"
  case class Message(val text: String)
  def apply(): Behavior[Message] = Behaviors.setup { ctx =>
      val test2 = ctx.spawn(Test2(), "Test2")
      Behaviors.receive { (ctx, msg) =>
        msg.text match {
          case ACK => {
            println(s"Test1: ACK from Test2 (thread: ${Thread.currentThread().getId()})")
            Behaviors.same
          }
          case DONE => {
            test2 ! Test2.Message(Test2.DONE, ctx.self)
            Behaviors.stopped
          }
          case _ => {
            println(s"Test1: ${msg.text} (thread: ${Thread.currentThread().getId()})")
            implicit val duration: Timeout = Duration(100, MILLISECONDS)
            implicit val scheduler = ctx.system.scheduler
            implicit val ec = ctx.system.executionContext
            val fut = test2 ? ((me: ActorRef[Message]) => new Test2.Message(msg.text, me))
            fut.onComplete {
              case Success(msg) => println(s"Reply received: ${msg.text.replace("_", "")} (thread: ${Thread.currentThread().getId()})")
              case Failure(ex) => println("ooops")
            }
            Behaviors.same
        }
      }
    }
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    println(s"main (thread: ${Thread.currentThread().getId()})")
    val test1 = ActorSystem.create(Test1(), "Test1")
    test1 ! new Test1.Message("Hello world!")
    test1 ! new Test1.Message("Hello world!")
    test1 ! new Test1.Message("Hello world!")
    Thread.sleep(100)
    test1 ! new Test1.Message(Test1.DONE)
    test1.terminate()
  }
}
package actor.akka.classic.onreceive;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;

public class Test1 extends UntypedAbstractActor {
    public static enum Command {
        ACK,
        DONE
    }
    public static class Message {
        private String text;
        public Message(String text) {
            this.text = text;
        }
        public String getText() {
            return text;
        }
    }
    private ActorRef test2;
    @Override
    public void preStart() {
        test2 = getContext().actorOf(Props.create(Test2.class));
    }
    @Override
    public void onReceive(Object o) throws Throwable {
        if(o == Command.ACK) {
            System.out.printf("test1: ACK from test2  (thread: %d)\n", Thread.currentThread().getId());
        } else if(o == Command.DONE) {
            test2.tell(Test2.Command.DONE, getSelf());
            getContext().stop(getSelf());
        } else if(o instanceof Message) {
            Message msg = (Message)o;
            System.out.printf("test1: %s (thread: %d)\n", msg.getText(), Thread.currentThread().getId());
            test2.tell(new Test2.Message(msg.getText(), getSelf()), getSelf());
        } else {
            System.out.printf("Unknown message: %s (%s)\n", o.toString(), o.getClass().getName());
        }
    }
}
package actor.akka.classic.onreceive;

import akka.actor.ActorRef;
import akka.actor.UntypedAbstractActor;

public class Test2 extends UntypedAbstractActor {
    public static enum Command {
        DONE
    }
    public static class Message {
        private String text;
        private ActorRef sender;
        public Message(String text, ActorRef sender) {
            this.text = text;
            this.sender = sender;
        }
        public String getText() {
            return text;
        }
        public ActorRef getSender() {
            return sender;
        }
    }
    @Override
    public void onReceive(Object o) throws Throwable {
        if(o == Command.DONE) {
            getContext().stop(getSelf());
        } else if(o instanceof Message) {
            Message msg = (Message)o;
            System.out.printf("test2: %s (thread: %d)\n", msg.getText(), Thread.currentThread().getId());
            msg.getSender().tell(Test1.Command.ACK, getSelf());
        } else {
            System.out.printf("Unknown message: %s (%s)\n", o.toString(), o.getClass().getName());
        }
    }
}
package actor.akka.classic.onreceive;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        System.out.printf("main (thread: %d)\n", Thread.currentThread().getId());
        ActorSystem akka = ActorSystem.create();
        ActorRef test1 = akka.actorOf(Props.create(Test1.class));
        test1.tell(new Test1.Message("Hello world!"), ActorRef.noSender());
        test1.tell(new Test1.Message("Hello world!"), ActorRef.noSender());
        test1.tell(new Test1.Message("Hello world!"), ActorRef.noSender());
        Thread.sleep(100); // wait for everything to complete
        test1.tell(Test1.Command.DONE, ActorRef.noSender());
        akka.terminate();
    }
}
package actor.akka.classic.receivebuilder;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;

public class Test1 extends AbstractActor {
    public static enum Command {
        ACK,
        DONE
    }
    public static class Message {
        private String text;
        public Message(String text) {
            this.text = text;
        }
        public String getText() {
            return text;
        }
    }
    private ActorRef test2;
    @Override
    public void preStart() {
        test2 = getContext().actorOf(Props.create(Test2.class));
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder().matchEquals(Command.ACK, emsg -> {
            System.out.printf("test1: ACK from test2  (thread: %d)\n", Thread.currentThread().getId());
        }).matchEquals(Command.DONE, emsg -> {
            test2.tell(Test2.Command.DONE, getSelf());
            getContext().stop(getSelf());
        }).match(Message.class, msg -> {
            System.out.printf("test1: %s (thread: %d)\n", msg.getText(), Thread.currentThread().getId());
            test2.tell(new Test2.Message(msg.getText(), getSelf()), getSelf());
        }).build();
    }
}
package actor.akka.classic.receivebuilder;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;

public class Test2 extends AbstractActor {
    public static enum Command {
        DONE
    }
    public static class Message {
        private String text;
        private ActorRef sender;
        public Message(String text, ActorRef sender) {
            this.text = text;
            this.sender = sender;
        }
        public String getText() {
            return text;
        }
        public ActorRef getSender() {
            return sender;
        }
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder().matchEquals(Command.DONE, emsg -> {
            getContext().stop(getSelf());
        }).match(Message.class, msg -> {
            System.out.printf("test2: %s (thread: %d)\n", msg.getText(), Thread.currentThread().getId());
            msg.getSender().tell(Test1.Command.ACK, getSelf());
        }).build();
    }
}
package actor.akka.classic.receivebuilder;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        System.out.printf("main (thread: %d)\n", Thread.currentThread().getId());
        ActorSystem akka = ActorSystem.create();
        ActorRef test1 = akka.actorOf(Props.create(Test1.class));
        test1.tell(new Test1.Message("Hello world!"), ActorRef.noSender());
        test1.tell(new Test1.Message("Hello world!"), ActorRef.noSender());
        test1.tell(new Test1.Message("Hello world!"), ActorRef.noSender());
        Thread.sleep(100); // wait for everything to complete
        test1.tell(Test1.Command.DONE, ActorRef.noSender());
        akka.terminate();
    }
}
package actor.akka.classic.typed;

public interface Test1 {
    void ack();
    void done();
    void message(String msg);
}
package actor.akka.classic.typed;

import akka.actor.TypedActor;
import akka.actor.TypedProps;

public class Test1Impl implements Test1 {
    private Test2 test2;
    public Test1Impl() {
        test2 = TypedActor.get(TypedActor.context()).typedActorOf(new TypedProps<Test2Impl>(Test2.class, Test2Impl.class));
    }
    @Override
    public void ack() {
        System.out.printf("test1: ACK from test2  (thread: %d)\n", Thread.currentThread().getId());
    }
    @Override
    public void done() {
        test2.done();
        TypedActor.get(TypedActor.context()).stop(this);

    }
    @Override
    public void message(String msg) {
        System.out.printf("test1: %s (thread: %d)\n", msg, Thread.currentThread().getId());
        test2.message(msg, TypedActor.self());
    }
}
package actor.akka.classic.typed;

public interface Test2 {
    void done();
    void message(String msg, Test1 sender);
}
package actor.akka.classic.typed;

import akka.actor.TypedActor;

public class Test2Impl implements Test2 {
    @Override
    public void done() {
        TypedActor.get(TypedActor.context()).stop(this);
    }
    @Override
    public void message(String msg, Test1 sender) {
        System.out.printf("test2: %s (thread: %d)\n", msg, Thread.currentThread().getId());
        sender.ack();
    }
}
package actor.akka.classic.typed;

import akka.actor.ActorSystem;
import akka.actor.TypedActor;
import akka.actor.TypedProps;

public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        System.out.printf("main (thread: %d)\n", Thread.currentThread().getId());
        ActorSystem akka = ActorSystem.create();
        Test1 test1 = TypedActor.get(akka).typedActorOf(new TypedProps<Test1Impl>(Test1.class, Test1Impl.class));
        test1.message("Hello world!");
        test1.message("Hello world!");
        test1.message("Hello world!");
        Thread.sleep(100); // wait for everything to complete
        test1.done();
        akka.terminate();
    }
}
package actor.akka.typed.tell;

import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;

public class Test1 extends AbstractBehavior<Test1.Message> {
    public static abstract class Message {
    }
    public static class CommandMessage extends Message {
        public static final CommandMessage ACK = new CommandMessage();
        public static final CommandMessage DONE = new CommandMessage();
    }
    public static class TextMessage extends Message {
        private String text;
        public TextMessage(String text) {
            this.text = text;
        }
        public String getText() {
            return text;
        }
        
    }
    public static Behavior<Message> create() {
        return Behaviors.setup(Test1::new);
    }
    private ActorRef<Test2.Message> test2;
    public Test1(ActorContext<Message> ctx) {
        super(ctx);
        test2 = ctx.spawn(Test2.create(), "Test2");
    }
    @Override
    public Receive<Message> createReceive() {
        return newReceiveBuilder().onMessageEquals(CommandMessage.ACK, () -> {
            System.out.printf("test1: ACK from test2  (thread: %d)\n", Thread.currentThread().getId());
            return this;
        }).onMessageEquals(CommandMessage.DONE, () -> {
            test2.tell(Test2.CommandMessage.DONE);
            Behaviors.stopped();
            return this;
        }).onMessage(TextMessage.class, msg -> {
            System.out.printf("test1: %s (thread: %d)\n", msg.getText(), Thread.currentThread().getId());
            test2.tell(new Test2.TextMessage(msg.getText(), getContext().getSelf()));
            return this;
        }).build();
    }
}
package actor.akka.typed.tell;

import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;

public class Test2 extends AbstractBehavior<Test2.Message>  {
    public static abstract class Message {
    }
    public static class CommandMessage extends Message {
        public static final CommandMessage DONE = new CommandMessage();
    }
    public static class TextMessage extends Message {
        private String text;
        private ActorRef<Test1.Message> sender;
        public TextMessage(String text, ActorRef<Test1.Message> sender) {
            this.text = text;
            this.sender = sender;
        }
        public String getText() {
            return text;
        }
        public ActorRef<Test1.Message> getSender() {
            return sender;
        }
    }
    public static Behavior<Message> create() {
        return Behaviors.setup(Test2::new);
    }
    public Test2(ActorContext<Message> ctx) {
        super(ctx);
    }
    @Override
    public Receive<Message> createReceive() {
        return newReceiveBuilder().onMessageEquals(CommandMessage.DONE, () -> {
            Behaviors.stopped();
            return this;
        }).onMessage(TextMessage.class, msg -> {
            System.out.printf("test2: %s (thread: %d)\n", msg.getText(), Thread.currentThread().getId());
            msg.getSender().tell(Test1.CommandMessage.ACK);
            return this;
        }).build();
    }
}
package actor.akka.typed.tell;

import akka.actor.typed.ActorSystem;

public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        System.out.printf("main (thread: %d)\n", Thread.currentThread().getId());
        ActorSystem<Test1.Message> test1 = ActorSystem.create(Test1.create(), "Test1");
        test1.tell(new Test1.TextMessage("Hello world!"));
        test1.tell(new Test1.TextMessage("Hello world!"));
        test1.tell(new Test1.TextMessage("Hello world!"));
        Thread.sleep(100); // wait for everything to complete
        test1.tell(Test1.CommandMessage.DONE);
        test1.terminate();
        System.exit(0);
    }
}
using System;
using System.Threading;

using Akka.Actor;

namespace Actor.Akka.Untyped
{
    public class Test2 : UntypedActor
    {
        public enum Command { DONE }
        public class Message
        {
            public string Text { get; set; }
            public IActorRef Sender { get; set; }
        }
        protected override void OnReceive(object o)
        {
            if (o.Equals(Command.DONE))
            {
                Self.GracefulStop(TimeSpan.FromMilliseconds(100));
            }
            else if (o is Message)
            {
                Message msg = (Message)o;
                Console.WriteLine("Test2: {0} (thread: {1})", msg.Text, Thread.CurrentThread.ManagedThreadId);
                msg.Sender.Tell(Test1.Command.ACK);
            }
            else
            {
                Console.WriteLine("Unknown message: {0} ({1})", o.ToString(), o.GetType().FullName);
            }
        }
    }

    public class Test1 : UntypedActor
    {
        public enum Command { ACK, DONE }
        public class Message
        {
            public string Text { get; set; }
        }
        private IActorRef test2;
        protected override void PreStart()
        {
            test2 = Context.ActorOf(Props.Create(typeof(Test2)));
        }
        protected override void OnReceive(object o)
        {
            if(o.Equals(Command.ACK))
            {
                Console.WriteLine("Test1: ACK from Test2 (thread: {0})", Thread.CurrentThread.ManagedThreadId);
            }
            else if(o.Equals(Command.DONE))
            {
                Self.GracefulStop(TimeSpan.FromMilliseconds(100));
            }
            else if(o is Message)
            {
                Message msg = (Message)o;
                Console.WriteLine("Test1: {0} (thread: {1})", msg.Text, Thread.CurrentThread.ManagedThreadId);
                test2.Tell(new Test2.Message { Text = msg.Text, Sender = Self });
            }
            else
            {
                Console.WriteLine("Unknown nessage: {0} ({1})", o.ToString(), o.GetType().FullName);
            }
        }
    }
    public class Program
    {
        public static void Main(string[] args)
        {
            Console.WriteLine("main (thread: {0})", Thread.CurrentThread.ManagedThreadId);
            ActorSystem akka = ActorSystem.Create("Demo");
            IActorRef test1 = akka.ActorOf(Props.Create(typeof(Test1)));
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            Thread.Sleep(100); // wait for everything to complete
            test1.Tell(Test1.Command.DONE);
            akka.Terminate();
        }
    }
}
using System;
using System.Threading;

using Akka.Actor;

namespace Actor.Akka.Receiver.Tell
{
    public class Test2 : ReceiveActor
    {
        public enum Command { DONE }
        public class Message
        {
            public string Text { get; set; }
            public IActorRef Sender { get; set; }
        }
        public Test2()
        {
            Receive<Command>((Command cmd) => {
                switch(cmd)
                {
                    case Command.DONE:
                        Self.GracefulStop(TimeSpan.FromMilliseconds(100));
                        break;
                }
                
            });
            Receive<Message>((Message msg) => {
                Console.WriteLine("Test2: {0} (thread: {1})", msg.Text, Thread.CurrentThread.ManagedThreadId);
                msg.Sender.Tell(Test1.Command.ACK);
            });
        }
    }
    public class Test1 : ReceiveActor
    {
        public enum Command { ACK, DONE }
        public class Message
        {
            public string Text { get; set; }
        }
        private IActorRef test2;
        protected override void PreStart()
        {
            test2 = Context.ActorOf(Props.Create(typeof(Test2)));
        }
        public Test1()
        {
            Receive<Command>((Command cmd) => {
                switch (cmd)
                {
                    case Command.ACK:
                        Console.WriteLine("Test1: ACK from Test2 (thread: {0})", Thread.CurrentThread.ManagedThreadId);
                        break;
                    case Command.DONE:
                        Self.GracefulStop(TimeSpan.FromMilliseconds(100));
                        break;
                }

            });
            Receive<Message>((Message msg) => {
                Console.WriteLine("Test1: {0} (thread: {1})", msg.Text, Thread.CurrentThread.ManagedThreadId);
                test2.Tell(new Test2.Message { Text = msg.Text, Sender = Self });
            });
        }
    }
    public class Program
    {
        public static void Main(string[] args)
        {
            Console.WriteLine("main (thread: {0})", Thread.CurrentThread.ManagedThreadId);
            ActorSystem akka = ActorSystem.Create("Demo");
            IActorRef test1 = akka.ActorOf(Props.Create(typeof(Test1)));
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            Thread.Sleep(100); // wait for everything to complete
            test1.Tell(Test1.Command.DONE);
            akka.Terminate();
        }
    }
}
using System;
using System.Threading;
using System.Threading.Tasks;

using Akka.Actor;

namespace Actor.Akka.Receiver.Ask
{
    public class Test2 : ReceiveActor
    {
        public enum Command { DONE }
        public class Message
        {
            public string Text { get; set; }
            public IActorRef Sender { get; set; }
        }
        public Test2()
        {
            Receive<Command>((Command cmd) => {
                switch (cmd)
                {
                    case Command.DONE:
                        Self.GracefulStop(TimeSpan.FromMilliseconds(100));
                        break;
                }

            });
            Receive<Message>((Message msg) => {
                Console.WriteLine("Test2: {0} (thread: {1})", msg.Text, Thread.CurrentThread.ManagedThreadId);
                //msg.Sender.Tell(Test1.Command.ACK);
                Sender.Tell(Test1.Command.ACK);
            });
        }
    }
    public class Test1 : ReceiveActor
    {
        public enum Command { ACK, DONE }
        public class Message
        {
            public string Text { get; set; }
        }
        private IActorRef test2;
        protected override void PreStart()
        {
            test2 = Context.ActorOf(Props.Create(typeof(Test2)));
        }
        public Test1()
        {
            Receive<Command>((Command cmd) => {
                switch (cmd)
                {
                    case Command.DONE:
                        Self.GracefulStop(TimeSpan.FromMilliseconds(100));
                        break;
                }

            });
            Receive<Message>((Message msg) => {
                Console.WriteLine("Test1: {0} (thread: {1})", msg.Text, Thread.CurrentThread.ManagedThreadId);
                Task<Command> fut = test2.Ask<Command>(new Test2.Message { Text = msg.Text, Sender = Self }, TimeSpan.FromMilliseconds(100));
                Console.WriteLine("Reply recieved: {0} (thread: {1})", fut.Result, Thread.CurrentThread.ManagedThreadId);
            });
        }
    }
    public class Program
    {
        public static void Main(string[] args)
        {
            Console.WriteLine("main (thread: {0})", Thread.CurrentThread.ManagedThreadId);
            ActorSystem akka = ActorSystem.Create("Demo");
            IActorRef test1 = akka.ActorOf(Props.Create(typeof(Test1)));
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            test1.Tell(new Test1.Message { Text = "Hello world!" });
            Thread.Sleep(1000); // wait for everything to complete
            test1.Tell(Test1.Command.DONE);
            akka.Terminate();
        }
    }
}
import threading
import time
from pykka import *


TEST1_ACK = '__ACK__'
TEST1_DONE = '__DONE__'

TEST2_DONE = '__DONE__'

class Test2(ThreadingActor):
    def __init__(self, test1):
        super().__init__()
        self.test1 = test1
    def on_receive(self, msg):
        if msg == TEST2_DONE:
            self.stop()
        else:
            print('test2: %s (thread: %d)' % (msg, threading.get_ident()))
            self.test1.tell(TEST1_ACK)

class Test1(ThreadingActor):
    def __init__(self):
        super().__init__()
        self.test2 = Test2.start(test1=self.actor_ref)
    def on_receive(self, msg):
        if msg == TEST1_ACK:
            print('test1: ACK from test2 (thread: %d)' % (threading.get_ident())) 
        elif msg == TEST1_DONE:
            self.test2.tell(TEST2_DONE)
            self.stop()
        else:
            print('test1: %s (thread: %d)' % (msg, threading.get_ident()))
            self.test2.tell(msg)

print('main (thread: %d)' % (threading.get_ident()))
test1 = Test1.start()
test1.tell('Hello wold!')
test1.tell('Hello wold!')
test1.tell('Hello wold!')
time.sleep(0.1)
test1.tell(TEST1_DONE)
    
import threading
import time
from pykka import *


TEST1_DONE = '__DONE__'

TEST2_DONE = '__DONE__'

class Test2(ThreadingActor):
    def on_receive(self, msg):
        if msg == TEST2_DONE:
            self.stop()
        else:
            print('test2: %s (thread: %d)' % (msg, threading.get_ident()))
            return 'ACK'

class Test1(ThreadingActor):
    def __init__(self):
        super().__init__()
        self.test2 = Test2.start()
    def on_receive(self, msg):
        if msg == TEST1_DONE:
            self.test2.tell(TEST2_DONE)
            self.stop()
        else:
            print('test1: %s (thread: %d)' % (msg, threading.get_ident()))
            replymsg = self.test2.ask(msg)
            print('reply received: %s (thread: %d)' % (replymsg, threading.get_ident()))

print('main (thread: %d)' % (threading.get_ident()))
test1 = Test1.start()
test1.tell('Hello wold!')
test1.tell('Hello wold!')
test1.tell('Hello wold!')
time.sleep(0.1)
test1.tell(TEST1_DONE)
    
import threading
import time
from pykka import *

TEST1_DONE = '__DONE__'

TEST2_DONE = '__DONE__'

class Test2(ThreadingActor):
    def on_receive(self, msg):
        if msg == TEST2_DONE:
            self.stop()
    def something(self, msg):
        print('test2: %s (thread: %d)' % (msg, threading.get_ident()))
        return 'ACK'

class Test1(ThreadingActor):
    def __init__(self):
        super().__init__()
        self.test2 = Test2.start()
    def on_receive(self, msg):
        if msg == TEST1_DONE:
            self.test2.tell(TEST2_DONE)
            self.stop()
        else:
            print('test1: %s (thread: %d)' % (msg, threading.get_ident()))
            replymsg = self.test2.proxy().something(msg).get()
            print('reply received: %s (thread: %d)' % (replymsg, threading.get_ident()))

print('main (thread: %d)' % (threading.get_ident()))
test1 = Test1.start()
test1.tell('Hello wold!')
test1.tell('Hello wold!')
test1.tell('Hello wold!')
time.sleep(0.1)
test1.tell(TEST1_DONE)
    
#include <stdio.h>
#include <string.h>

#include <czmq.h>

#ifdef _WIN32
#define THREADID GetCurrentThreadId
#else
#define THREADID pthread_self
#endif

#define TEST2_DONE "__DONE__"

#define TEST1_ACK "__ACK__"
#define TEST1_DONE "__DONE__"

struct context
{
    zactor_t *actor;
};

typedef struct context context_t;

void test2(zsock_t *pipe, void *args)
{   
    zactor_t *sender;
    int done;
    char *msg;
    zsock_signal(pipe, 0);
    done = 0;
    while(!done)
    {
        msg = zstr_recv(pipe);
        if(strcmp(msg, TEST2_DONE) == 0 || strcmp(msg, "$TERM") == 0)
        {
            done = 1;
        }
        else
        {
            printf("Test2: %s (thread: %d)\n", msg, THREADID());
            sender = ((context_t *)args)->actor;
            zstr_send(sender, TEST1_ACK);
        }
        zstr_free(&msg);
    }
}

void test1(zsock_t *pipe, void *args)
{   
    zactor_t *test2_actor;
    int done;
    char *msg;
    test2_actor = zactor_new(test2, args);
    zsock_signal(pipe, 0);
    done = 0;
    while(!done)
    {
        msg = zstr_recv(pipe);
        if(strcmp(msg, TEST1_ACK) == 0)
        {
            printf("Test1: ACK from Test2 (thread: %d)\n", THREADID());
        }
        else if(strcmp(msg, TEST1_DONE) == 0 || strcmp(msg, "$TERM") == 0)
        {
            zstr_send(test2_actor, TEST2_DONE);
            done = 1;
            zactor_destroy(&test2_actor);
        }
        else
        {
            printf("Test1: %s (thread: %d)\n", msg, THREADID());
            zstr_send(test2_actor, msg);
        }
        zstr_free(&msg);
    }
}

int main(void)
{
    context_t ctx;
    zactor_t *test1_actor;
    printf("main (thread: %d)\n", THREADID());
    test1_actor = zactor_new(test1, &ctx);
    ctx.actor = test1_actor;
    zstr_send(test1_actor, "Hello world!");
    zstr_send(test1_actor, "Hello world!");
    zstr_send(test1_actor, "Hello world!");
    zclock_sleep(100);
    zstr_send(test1_actor, TEST1_DONE);
    zactor_destroy(&test1_actor);
    return 0;
}
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

const char* TEST2_DONE = "__DONE__";

const char* TEST1_ACK = "__ACK__";
const char* TEST1_DONE = "__DONE__";

behavior test2(event_based_actor* self)
{
    return
    {
        [=](const string& msg)
        {
            if(msg == TEST1_DONE)
            {
                self->quit();
            }
            else
            {
                aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
                self->send(actor_cast<event_based_actor*>(self->current_sender()), TEST1_ACK);
            }
        },
    };
}

behavior test1(event_based_actor* self)
{
    auto test2_actor = self->spawn(test2);
    return
    {
        [=](const string& msg)
        {
            if(msg == TEST1_ACK)
            {
                aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
            }
            else if(msg == TEST1_DONE)
            {
                self->send(test2_actor, TEST2_DONE);
                self->quit();
            }
            else
            {
                aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
                self->send(test2_actor, msg);
            }
        },
    };
}

void caf_main(actor_system& sys)
{
    auto test1_actor = sys.spawn(test1);
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, TEST1_DONE);
}

CAF_MAIN()
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

CAF_BEGIN_TYPE_ID_BLOCK(demo, first_custom_type_id)
  CAF_ADD_ATOM(demo, test2_done)
  CAF_ADD_ATOM(demo, test1_ack)
  CAF_ADD_ATOM(demo, test1_done)
CAF_END_TYPE_ID_BLOCK(demo)

behavior test2(event_based_actor* self)
{
    return
    {
        [=](test2_done)
        {
            self->quit();
        },
        [=](const string& msg)
        {
            aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
            self->send(actor_cast<event_based_actor*>(self->current_sender()), test1_ack_v);
        },
    };
}

behavior test1(event_based_actor* self)
{
    auto test2_actor = self->spawn(test2);
    return
    {
        [=](test1_ack)
        {
            aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
        },
        [=](test1_done)
        {
            self->send(test2_actor, test2_done_v);
            self->quit();
        },
        [=](const string& msg)
        {
            aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
            self->send(test2_actor, msg);
        },
    };
}

void caf_main(actor_system& sys)
{
    auto test1_actor = sys.spawn(test1);
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, test1_done_v);
}

CAF_MAIN(id_block::demo)
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

const char* TEST2_DONE = "__DONE__";

const char* TEST1_ACK = "__ACK__";
const char* TEST1_DONE = "__DONE__";

void test2(blocking_actor* self)
{
    bool run = true;
    self->receive_while(run)(
        [&](const string& msg)
        {
            if(msg == TEST2_DONE)
            {
                run = false;
            }
            else
            {
                aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
                self->send(actor_cast<event_based_actor*>(self->current_sender()), TEST1_ACK);
            }
        }
    );
}

void test1(blocking_actor* self) {
    auto test2_actor = self->spawn(test2);
    bool run = true;
    self->receive_while(run)(
        [&](const string& msg)
        {
            if(msg == TEST1_ACK)
            {
                aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
            }
            else if(msg == TEST1_DONE)
            {
                self->send(test2_actor, TEST2_DONE);
                run = false;
            }
            else
            {
                aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
                self->send(test2_actor, msg);
            }
        }
    );
}

void caf_main(actor_system& sys) {
    auto test1_actor = sys.spawn(test1);
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, TEST1_DONE);
}

CAF_MAIN()
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

CAF_BEGIN_TYPE_ID_BLOCK(demo, first_custom_type_id)
  CAF_ADD_ATOM(demo, test2_done)
  CAF_ADD_ATOM(demo, test1_ack)
  CAF_ADD_ATOM(demo, test1_done)
CAF_END_TYPE_ID_BLOCK(demo)

void test2(blocking_actor* self)
{
    bool run = true;
    self->receive_while(run)(
        [&](test2_done)
        {
            run = false;
        },
        [&](const string& msg)
        {
            aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
            self->send(actor_cast<event_based_actor*>(self->current_sender()), test1_ack_v);
        }
    );
}

void test1(blocking_actor* self)
{
    auto test2_actor = self->spawn(test2);
    bool run = true;
    self->receive_while(run)(
        [&](test1_ack)
        {
            aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
        },
        [&](test1_done)
        {
            self->send(test2_actor, test2_done_v);
            run = false;
        },
        [&](const string& msg)
        {
            aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
            self->send(test2_actor, msg);
        }
    );
}

void caf_main(actor_system& sys) {
    auto test1_actor = sys.spawn(test1);
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, test1_done_v);
}

CAF_MAIN(id_block::demo)
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

const char* TEST2_DONE = "__DONE__";

const char* TEST1_ACK = "__ACK__";
const char* TEST1_DONE = "__DONE__";

behavior test2(event_based_actor* self)
{
    return
    {
        [=](const string& msg)
        {
            if(msg == TEST1_DONE)
            {
                self->quit();
            }
            else
            {
                aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
                self->send(actor_cast<event_based_actor*>(self->current_sender()), TEST1_ACK);
            }
        },
    };
}

class test2_actorclass : public event_based_actor {
    public:
        test2_actorclass(actor_config& cfg) : event_based_actor(cfg) { }
        behavior make_behavior() override {
            return test2(this);
        }
};

behavior test1(event_based_actor* self)
{
    auto test2_actor = self->spawn<test2_actorclass>();
    return
    {
        [=](const string& msg)
        {
            if(msg == TEST1_ACK)
            {
                aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
            }
            else if(msg == TEST1_DONE)
            {
                self->send(test2_actor, TEST2_DONE);
                self->quit();
            }
            else
            {
                aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
                self->send(test2_actor, msg);
            }
        },
    };
}

class test1_actorclass : public event_based_actor {
    public:
        test1_actorclass(actor_config& cfg) : event_based_actor(cfg) { }
        behavior make_behavior() override {
            return test1(this);
        }
};

void caf_main(actor_system& sys)
{
    auto test1_actor = sys.spawn<test1_actorclass>();
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, TEST1_DONE);
}

CAF_MAIN()
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

CAF_BEGIN_TYPE_ID_BLOCK(demo, first_custom_type_id)
  CAF_ADD_ATOM(demo, test2_done)
  CAF_ADD_ATOM(demo, test1_ack)
  CAF_ADD_ATOM(demo, test1_done)
CAF_END_TYPE_ID_BLOCK(demo)

behavior test2(event_based_actor* self)
{
    return
    {
        [=](test2_done)
        {
            self->quit();
        },
        [=](const string& msg)
        {
            aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
            self->send(actor_cast<event_based_actor*>(self->current_sender()), test1_ack_v);
        },
    };
}

class test2_actorclass : public event_based_actor {
    public:
        test2_actorclass(actor_config& cfg) : event_based_actor(cfg) { }
        behavior make_behavior() override {
            return test2(this);
        }
};

behavior test1(event_based_actor* self)
{
    auto test2_actor = self->spawn<test2_actorclass>();
    return
    {
        [=](test1_ack)
        {
            aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
        },
        [=](test1_done)
        {
            self->send(test2_actor, test2_done_v);
            self->quit();
        },
        [=](const string& msg)
        {
            aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
            self->send(test2_actor, msg);
        },
    };
}

class test1_actorclass: public event_based_actor {
    public:
        test1_actorclass(actor_config& cfg) : event_based_actor(cfg) { }
        behavior make_behavior() override {
            return test1(this);
        }
};

void caf_main(actor_system& sys)
{
    auto test1_actor = sys.spawn<test1_actorclass>();
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, test1_done_v);
}

CAF_MAIN(id_block::demo)
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

const char* TEST2_DONE = "__DONE__";

const char* TEST1_ACK = "__ACK__";
const char* TEST1_DONE = "__DONE__";

void test2(blocking_actor* self)
{
    bool run = true;
    self->receive_while(run)(
        [&](const string& msg)
        {
            if(msg == TEST2_DONE)
            {
                run = false;
            }
            else
            {
                aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
                self->send(actor_cast<event_based_actor*>(self->current_sender()), TEST1_ACK);
            }
        }
    );
}

class test2_actorclass : public blocking_actor {
    public:
        test2_actorclass(actor_config& cfg) : blocking_actor(cfg) { }
        void act() override {
            return test2(this);
        }
};

void test1(blocking_actor* self) {
    auto test2_actor = self->spawn<test2_actorclass>();
    bool run = true;
    self->receive_while(run)(
        [&](const string& msg)
        {
            if(msg == TEST1_ACK)
            {
                aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
            }
            else if(msg == TEST1_DONE)
            {
                self->send(test2_actor, TEST2_DONE);
                run = false;
            }
            else
            {
                aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
                self->send(test2_actor, msg);
            }
        }
    );
}

class test1_actorclass : public blocking_actor {
    public:
        test1_actorclass(actor_config& cfg) : blocking_actor(cfg) { }
        void act() override {
            return test1(this);
        }
};

void caf_main(actor_system& sys) {
    auto test1_actor = sys.spawn<test1_actorclass>();
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, TEST1_DONE);
}

CAF_MAIN()
#include <iostream>
#include <string>

using namespace std;

#include "caf/all.hpp"

using namespace caf;

#ifdef _WIN32
#define THREADID_T int
#define THREADID GetCurrentThreadId
#define SLEEP_MULT 1000
#define SLEEP Sleep
#else
#define THREADID_T pthread_t
#define THREADID pthread_self
#define SLEEP_MULT 1
#define SLEEP sleep
#endif
extern "C"
{
    THREADID_T THREADID();
    void SLEEP(int t);
}

CAF_BEGIN_TYPE_ID_BLOCK(demo, first_custom_type_id)
  CAF_ADD_ATOM(demo, test2_done)
  CAF_ADD_ATOM(demo, test1_ack)
  CAF_ADD_ATOM(demo, test1_done)
CAF_END_TYPE_ID_BLOCK(demo)

void test2(blocking_actor* self)
{
    bool run = true;
    self->receive_while(run)(
        [&](test2_done)
        {
            run = false;
        },
        [&](const string& msg)
        {
            aout(self) << "test2: " << msg << " (thread: " << THREADID() << ")" << endl;
            self->send(actor_cast<event_based_actor*>(self->current_sender()), test1_ack_v);
        }
    );
}

class test2_actorclass : public blocking_actor {
    public:
        test2_actorclass(actor_config& cfg) : blocking_actor(cfg) { }
        void act() override {
            return test2(this);
        }
};

void test1(blocking_actor* self)
{
    auto test2_actor = self->spawn<test2_actorclass>();
    bool run = true;
    self->receive_while(run)(
        [&](test1_ack)
        {
            aout(self) << "test1: ACK received"  << " (thread: " << THREADID() << ")" << endl;
        },
        [&](test1_done)
        {
            self->send(test2_actor, test2_done_v);
            run = false;
        },
        [&](const string& msg)
        {
            aout(self) << "test1: " << msg  << " (thread: " << THREADID() << ")" << endl;
            self->send(test2_actor, msg);
        }
    );
}

class test1_actorclass : public blocking_actor {
    public:
        test1_actorclass(actor_config& cfg) : blocking_actor(cfg) { }
        void act() override {
            return test1(this);
        }
};

void caf_main(actor_system& sys) {
    auto test1_actor = sys.spawn<test1_actorclass>();
    scoped_actor self { sys };
    aout(self) << "main (thread: " << THREADID() << ")" << endl;
    self->send(test1_actor, "Hello world!");
    SLEEP(1 * SLEEP_MULT);
    self->send(test1_actor, test1_done_v);
}

CAF_MAIN(id_block::demo)

Note that some of these Actor frameworks especially Akka is huge with a lot of functionality. The code above is just demoing the most basic functionality to illustrate actor conecpt. Any serious usage will require more deep study of the actor framework.

For CAF it is not only a huge framework, but I am also not experienced in C++ 17 (and CAF does require C++ 17).

Threading:

The actors are running in separate threads, so there is a threading model (a thread pool) inside the actor system.

To investigate the thread aspect more we will try putting some load on the actor system.

We will test with a model like:

Actor threading test
package actor.akka.classic.perf;

import java.util.concurrent.Semaphore;

import akka.actor.AbstractActor;

public class Controller extends AbstractActor {
    public static class StartMessage {
        private String label;
        private int expected;
        private int avgEffort;
        private Semaphore start;
        private Semaphore finished;
        public StartMessage(String label, int expected, int avgEffort, Semaphore start, Semaphore finished) {
            this.label = label;
            this.expected = expected;
            this.avgEffort = avgEffort;
            this.start = start;
            this.finished = finished;
        }
        public int getExpected() {
            return expected;
        }
        public int getAvgEffort() {
            return avgEffort;
        }
        public String getLabel() {
            return label;
        }
        public Semaphore getStart() {
            return start;
        }
        public Semaphore getFinished() {
            return finished;
        }
    }
    public static class DoneMessage {
    }
    public static class StopMessage {
    }
    private String label;
    private int initial;
    private int remaining;
    private int effort;
    private Semaphore finished;
    private long startTime;
    @Override
    public Receive createReceive() {
        return receiveBuilder().match(StartMessage.class, msg -> {
            label = msg.getLabel();
            initial = msg.getExpected();
            remaining = initial;
            effort = msg.getAvgEffort();
            finished = msg.getFinished();
            startTime = System.currentTimeMillis();
            msg.getStart().release();
        }).match(DoneMessage.class, msg -> {
            remaining--;
            if(remaining <= 0) {
                long endTime = System.currentTimeMillis();
                System.out.printf("%s: %d ms => parallelism level %.1f\n", label,
                                                                           endTime - startTime,
                                                                           initial * effort * 1.0 / (endTime - startTime));
                finished.release();
            }
        }).match(StopMessage.class, msg -> {
            getContext().stop(getSelf());
        }).build();
    }
}
package actor.akka.classic.perf;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;

public class Processor extends AbstractActor {
    public static class TaskMessage {
        private int effort;
        private ActorRef controller;
        public TaskMessage(int effort, ActorRef controller) {
            this.effort = effort;
            this.controller = controller;
        }
        public int getEffort() {
            return effort;
        }
        public ActorRef getController() {
            return controller;
        }
    }
    public static class StopMessage {
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder().match(TaskMessage.class, msg -> {
            Thread.sleep(msg.getEffort());
            msg.getController().tell(new Controller.DoneMessage(), getSelf());
        }).match(StopMessage.class, msg -> {
            getContext().stop(getSelf());
        }).build();
    }
}
package actor.akka.classic.perf;

import java.util.concurrent.Semaphore;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class TestMain {
    private static final int NTASK = 10000;
    private static final int EFFORT = 10;
    private static Semaphore start = new Semaphore(1);
    private static Semaphore finished = new Semaphore(1);
    private static void test(int nactor) throws InterruptedException {
        ActorSystem akka = ActorSystem.create();
        ActorRef ctl = akka.actorOf(Props.create(Controller.class));
        ActorRef[] proc = new ActorRef[nactor];
        for(int i = 0; i < nactor; i++) {
            proc[i] = akka.actorOf(Props.create(Processor.class));
        }
        start.acquire();
        ctl.tell(new Controller.StartMessage(nactor + " actors", NTASK, EFFORT, start, finished), ActorRef.noSender());
        start.acquire();
        finished.acquire();
        for(int j = 0; j < NTASK; j++) {
            proc[j % nactor].tell(new Processor.TaskMessage(EFFORT, ctl), ActorRef.noSender());
        }
        finished.acquire();
        for(int i = 0; i < nactor; i++) {
            proc[i].tell(new Processor.StopMessage(), ActorRef.noSender());
        }
        ctl.tell(new Controller.StopMessage(), ActorRef.noSender());
        Thread.sleep(100);
        akka.terminate();
        start.release();
        finished.release();
    }
    private static final int[] nactors = { 5, 10, 25, 50, 100, 250 };
    public static void main(String[] args) throws InterruptedException {
        for(int nactor : nactors) {
            test(nactor);
        }
    }
}

Output:

5 actors: 31423 ms => parallelism level 3.2
10 actors: 15709 ms => parallelism level 6.4
25 actors: 12569 ms => parallelism level 8.0
50 actors: 12566 ms => parallelism level 8.0
100 actors: 10996 ms => parallelism level 9.1
250 actors: 10047 ms => parallelism level 10.0
using Akka.Actor;
using System;
using System.Threading;

namespace Actor.Akka.Perf
{
    public class Processor : ReceiveActor
    {
        public class TaskMessage
        {
            public int Effort { get; set; }
            public IActorRef Controller { get; set; }
        }
        public class StopMessage
        {
        }
        public Processor()
        {
            Receive<TaskMessage>((TaskMessage msg) => {
                Thread.Sleep(msg.Effort);
                msg.Controller.Tell(new Controller.DoneMessage());
            });
            Receive<StopMessage>((StopMessage msg) => {
                Self.GracefulStop(TimeSpan.FromMilliseconds(100));
            });
        }

    }
    public class Controller : ReceiveActor
    {
        public class StartMessage
        {
            public string Label { get; set; }
            public int Expected { get; set; }
            public int AvgEffort { get; set; }
            public Semaphore Start { get; set; }
            public Semaphore Finished { get; set; }
        }
        public class DoneMessage
        {
        }
        public class StopMessage
        {
        }
        private string label;
        private int initial;
        private int remaining;
        private int effort;
        private Semaphore finished;
        private DateTime startTime;
        public Controller()
        {
            Receive<StartMessage>((StartMessage msg) => {
                label = msg.Label;
                initial = msg.Expected;
                remaining = initial;
                effort = msg.AvgEffort;
                finished = msg.Finished;
                startTime = DateTime.Now;
                msg.Start.Release();
            });
            Receive<DoneMessage>((DoneMessage msg) => {
                remaining--;
                if (remaining <= 0)
                {
                    DateTime endTime = DateTime.Now;
                    Console.WriteLine("{0}: {1} ms => parallelism level {2:F1}", label, (endTime - startTime).TotalMilliseconds, initial * effort *1.0 / (endTime - startTime).TotalMilliseconds);
                    finished.Release();
                }
            });
            Receive<StopMessage>((StopMessage msg) => {
                Self.GracefulStop(TimeSpan.FromMilliseconds(100));
            });
        }
    }
    public class Program
    {
        private const int NTASK = 10000;
        private const int EFFORT = 10;
        private static Semaphore start = new Semaphore(1, 1);
        private static Semaphore finished = new Semaphore(1, 1);
        private static void Test(int nactor)
        {
            ActorSystem akka = ActorSystem.Create("Demo");
            IActorRef ctl = akka.ActorOf(Props.Create(typeof(Controller)));
            IActorRef[] proc = new IActorRef[nactor];
            for (int i = 0; i < nactor; i++)
            {
                proc[i] = akka.ActorOf(Props.Create(typeof(Processor)));
            }
            start.WaitOne();
            ctl.Tell(new Controller.StartMessage { Label = nactor + " actors", Expected = NTASK, AvgEffort = EFFORT, Start = start, Finished = finished });
            start.WaitOne();
            for (int j = 0; j < NTASK; j++)
            {
                proc[j % nactor].Tell(new Processor.TaskMessage { Effort = EFFORT, Controller = ctl });
            }
            finished.WaitOne();
            for (int i = 0; i < nactor; i++)
            {
                proc[i].Tell(new Processor.StopMessage());
            }
            finished.WaitOne();
            ctl.Tell(new Controller.StopMessage());
            Thread.Sleep(100);
            akka.Terminate();
            start.Release();
            finished.Release();
        }
        private static int[] nactors = { 5, 10, 25, 50, 100, 250 };
        public static void Main(string[] args)
        {
            foreach(int nactor in nactors)
            {
                Test(nactor);
            }
        }
    }
}

Output:

5 actors: 33885.6065 ms => parallelism level 3.0
10 actors: 17044.6877 ms => parallelism level 5.9
25 actors: 9336.2951 ms => parallelism level 10.7
50 actors: 9324.3394 ms => parallelism level 10.7
100 actors: 9166.2892 ms => parallelism level 10.9
250 actors: 7846.5161 ms => parallelism level 12.7

The actor system start the number of threads it considers appropriate given the instantiated actors.

Conclusion:

Actors are definitely relevant for some problems.

The complexity of actor frameworks vary. Akka/Akka.NET is a huge framework that takes some effort to learn. Pykka is a relative simple framework to learn.

Article history:

Version Date Description
1.0 August 27th 2023 Initial version
1.1 October 21st 2023 Add CAF examplesn

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj