提要:
如果图片显示失败或者想学习更多后端内容或者了解本人更多blog,请浏览未注册的版本 ZealSingerの博客

指南网站

Akka简介

Akka是在JVM上构建的,基于Actor模型的,高并发的,分布式的,容错应用的工具包和运行时,底层使用Scala编写,同时提供了面向Scala和Java的开发接口

Actor模型

在我们实现并发线程的通信方式中,其实总体就是分为两种方式:共享内存+消息传递,在大多数的开发语言中,采用的都是共享内存的形式,这也就存在一个很严重的数据竞争的问题,也是我们常见的八股的来源,所谓的数据并发问题。

而Actor模型,其实就是消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他的Actor,保证了单独写的原则,从而巧妙地避免了多线程地争夺,和共享数据的方式相比,消息传递机制最大的优点就是不会产生数据竞争,常见地方式有基于channel(例如golang)的消息传递和基于Actor(例如erlang)的消息传递

可以简单的理解,一个线程就是一个Actor,每个Actor中存储状态,行为,且每个Actor有且刚好仅有一个MailBox,MailBox相当于一个小型队列,用于存储Sender发送的消息,默认情况下是先进先出FIFO队列,也可以自行配置

每个Actor中不会有任何共享的数据,Actor之间通过消息的方式,一个Actor给另外一个Acotr发送消息,另外一个Acotr收到消息之后进行判断做出行为,且消息传递是异步的

image-20250521161805363

每个Actor = 状态(指actor对象的变量信息,状态由actor自身管理,避免并发环境下的锁和内存原子性问题) + 行为(指代actor中的计算逻辑,通过actor接收到的消息来改变actor的状态) + 消息/邮箱,通过不同的行为决策,可以作到任务分配和并发处理,任务拆解等

image-20250521162744309

一个系统中存在多个Actor相互发消息,但是即使同时有多个Acotr朝一个Actor发送消息,此目标Actor也只能一次处理一个消息,其余的消息就会存储到MailBox中,如果要实现并发处理,一个消息就该对应的多个目标Actor从而实现并发处理

image-20250521163347389

Akka的重要组成

  • akka-actors

    akka的核心,一个用于并发和分发的模型,没有线程原语的痛苦

  • akka-stream

    一种直观而安全的方式来实现异步,非阻塞的回压流处理

  • akka-http

    现代的,快速的,异步的,流的HTTP服务器和客户端

  • akka-sharding

    根据用户的身份,在集群中分配参与者

  • akka-cluster

    通过多个节点上分布我们的系统来获取弹性

  • Distributed Data

    最终一致性,高度读取和写入可用,低延迟数据

  • Akka Persistence

    为参与者的事件包允许他们在重启之后达到相同的状态

  • Akka Management

    云系统上允许Akka系统的拓展(k8s,aws...)

  • Alpakka

    Akka流连接器用于集成其他技术

快速入门

akka-guide/articles/qucikstart-akka-java.md at master · guobinhit/akka-guide

不是很清楚为何我电脑上跑不起来这个代码,但是不妨碍理解和分析

主要是三个类HelloWorldMain ; HelloWorldBot ;HelloWorld

// HelloWorldMain
public class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {
  
​
  public static void main(String[] args) throws Exception {
    
    final ActorSystem<SayHello> system =
        ActorSystem.create(HelloWorldMain.create(), "hello");
​
    system.tell(new HelloWorldMain.SayHello("World"));
    system.tell(new HelloWorldMain.SayHello("Akka"));
    
​
    Thread.sleep(3000);
    system.terminate();
  }
  
​
  public static record SayHello(String name) {}
​
  public static Behavior<SayHello> create() {
    return Behaviors.setup(HelloWorldMain::new);
  }
​
  private final ActorRef<HelloWorld.Greet> greeter;
​
  private HelloWorldMain(ActorContext<SayHello> context) {
    super(context);
    greeter = context.spawn(HelloWorld.create(), "greeter");
  }
​
  @Override
  public Receive<SayHello> createReceive() {
    return newReceiveBuilder().onMessage(SayHello.class, this::onSayHello).build();
  }
​
  private Behavior<SayHello> onSayHello(SayHello command) {
    ActorRef<HelloWorld.Greeted> replyTo =
        getContext().spawn(HelloWorldBot.create(3), command.name);
    greeter.tell(new HelloWorld.Greet(command.name, replyTo));
    return this;
  }
}
​
// HelloWorld
public class HelloWorld extends AbstractBehavior<HelloWorld.Greet> {
​
  public static record Greet(String whom, ActorRef<Greeted> replyTo) {}
  public static record Greeted(String whom, ActorRef<Greet> from) {}
​
  public static Behavior<Greet> create() {
    return Behaviors.setup(HelloWorld::new);
  }
​
  private HelloWorld(ActorContext<Greet> context) {
    super(context);
  }
​
  @Override
  public Receive<Greet> createReceive() {
    return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build();
  }
​
  private Behavior<Greet> onGreet(Greet command) {
    getContext().getLog().info("Hello {}!", command.whom);
    command.replyTo.tell(new Greeted(command.whom, getContext().getSelf()));
    return this;
  }
}
​
// HelloWorldBot
public class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {
​
  public static Behavior<HelloWorld.Greeted> create(int max) {
    return Behaviors.setup(context -> new HelloWorldBot(context, max));
  }
​
  private final int max;
  private int greetingCounter;
​
  private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
    super(context);
    this.max = max;
  }
​
  @Override
  public Receive<HelloWorld.Greeted> createReceive() {
    return newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build();
  }
​
  private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
    greetingCounter++;
    getContext().getLog().info("Greeting {} for {}", greetingCounter, message.from());
    if (greetingCounter == max) {
      return Behaviors.stopped();
    } else {
      message.from().tell(new HelloWorld.Greet(message.whom(), getContext().getSelf()));
      return this;
    }
  }
}
// 最后输出
Hello World!
Greeting 1 for Actor[akka://hello/user/greeter#-123456789]
Hello World!
Greeting 2 for Actor[akka://hello/user/greeter#-123456789]
Hello World!
Greeting 3 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 1 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 2 for Actor[akka://hello/user/greeter#-123456789]
Hello Akka!
Greeting 3 for Actor[akka://hello/user/greeter#-123456789]
  1. 消息触发顺序

    • HelloWorldMain 收到 SayHello 消息后,创建 HelloWorldBot 并向 HelloWorld 发送 Greet

    • HelloWorld 处理 Greet 后,回复 GreetedHelloWorldBot,触发循环。

  2. 循环逻辑

    • HelloWorldBot 每收到一次 Greeted 消息,计数器加 1,并重新发送 GreetHelloWorld,直到计数器满 3 次后停止。

  3. 系统层级

    • Actor 之间通过父子关系形成树形结构,确保消息传递和监管的隔离性。

image.png


image-20250521185857483

image-20250521190255023

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐