Akka Actor | 注意 sender 的闭包范围

Akka Actor 里都有一个 sender 方法返回此消息的发送方 Actor,进行消息回复非常方便。但是在使用 sender 时,需要注意一点:不要在可能运行于其它线程的闭包中调用 sender 方法,也就是说一定要在当前 Actor 范围内调用 sender 方法,否则会出现各种问题。正好写 Aquarius 的时候用到了 Actor,就用它演示一下。假设我们收到请求收藏的消息,并调用相关业务逻辑进行异步操作,并向 sender 返回操作结果。假如我们这样写:

1
2
3
4
5
6
7
8
9
10
override def receive: Receive = {
case go @ FavoriteOn(a, u) =>
service ❤ go onComplete {
case Success(x) =>
sender ! FAVORITE_PROCESS_SUCCESS
case Failure(ex) =>
sender ! FAVORITE_PROCESS_FAIL
}
// 其他代码略
}

这是对应的请求逻辑:

1
2
3
4
5
6
7
8
implicit val timeout = Timeout(5 seconds)
def like(aid: Int, uid: Int) = Action.async { implicit request =>
// 省略其它逻辑
(alActor ? FavoriteOn(aid, uid.toInt)).mapTo[ProcessResult] map { reply =>
Ok(Json.toJson(reply))
}
}

我们会发现,发出请求5s后抛出AskTimeoutException异常,表明Actor没有发送回reply:

1
2
3
4
5
6
7
8
9
play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[AskTimeoutException: Ask timed out on [Actor[akka://application/user/article-favorite-actor#377237056]] after [5000 ms]. Sender[null] sent message of type "message.FavoriteMessageActor$FavoriteOn".]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:269)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:195)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:98)
...
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/user/article-favorite-actor#377237056]] after [5000 ms]. Sender[null] sent message of type "message.FavoriteMessageActor$FavoriteOn".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
...

在对应Actor中检测sender的path,结果为deadLetters,表明我们此处获得的sender并不是消息发送方。这是因为上面的代码在Future的onComplete闭包中调用了sender方法,而onComplete闭包代码会异步执行(在另一个线程内),从而获得的sender并不是消息接收时对应的sender,行为是未定义,线程不安全的(可能会导致资源争用)。因此我们需要在Actor作用域内获取sender actor:

1
2
3
4
5
6
7
8
9
10
11
override def receive: Receive = {
val sender = super.sender()
case go @ FavoriteOn(a, u) =>
service ❤ go onComplete {
case Success(x) =>
sender ! FAVORITE_PROCESS_SUCCESS
case Failure(ex) =>
sender ! FAVORITE_PROCESS_FAIL
}
// 其他代码略
}

这样就确保 sender 是消息接收时对应的发送方 Actor,从而可以正常的发送 reply。

总结几种典型情况及解决方法:

  • 可变状态,如以下代码会导致未定义行为(由于资源争用),因此在 Actor 中不要使用共享的可变状态(一切都 val),或使用DynamicVariable
1
2
3
4
5
6
7
8
class MyActor extends Actor {
var state = State()
def receive = {
case _ =>
Future { state = newState }
otherActor ? message onSuccess { r => state = r }
}
}
  • Future的 onCompleteonSuccessonFailure。在这些回调中必须正确地引用 sender(在 Actor 作用域内获取或通过self

目前来说,编译器不能在编译期检查出 sender 的问题,因此写代码的时候一定要头脑清醒,注意 sender 的闭包范围。

文章目录