|
@@ -69,7 +69,7 @@ mod ping_pong {
|
|
|
mut self,
|
|
|
msg: Ping,
|
|
|
service: ServiceAddr,
|
|
|
- ) -> TransResult<Self, ClientHandleManual<T>> {
|
|
|
+ ) -> TransResult<Self, (ClientHandleManual<T>, T::OnSendPingReturn)> {
|
|
|
let state = if let Some(state) = self.state.take() {
|
|
|
state
|
|
|
} else {
|
|
@@ -97,9 +97,9 @@ mod ping_pong {
|
|
|
};
|
|
|
if let PingProtocolMsgs::PingReply(reply) = reply_enum {
|
|
|
match state.on_send_ping(reply).await {
|
|
|
- TransResult::Ok(new_state) => {
|
|
|
+ TransResult::Ok((new_state, return_var)) => {
|
|
|
self.state = Some(PingClientState::End(new_state));
|
|
|
- TransResult::Ok(self)
|
|
|
+ TransResult::Ok((self, return_var))
|
|
|
}
|
|
|
TransResult::Abort { from, err } => {
|
|
|
self.state = Some(PingClientState::Client(from));
|
|
@@ -280,10 +280,11 @@ mod ping_pong {
|
|
|
impl Client for ClientImpl {
|
|
|
actor_name!("ping_client");
|
|
|
|
|
|
- type OnSendPingFut = impl Future<Output = TransResult<Self, End>>;
|
|
|
+ type OnSendPingReturn = ();
|
|
|
+ type OnSendPingFut = impl Future<Output = TransResult<Self, (End, ())>>;
|
|
|
fn on_send_ping(self, _msg: PingReply) -> Self::OnSendPingFut {
|
|
|
self.counter.fetch_sub(1, Ordering::SeqCst);
|
|
|
- ready(TransResult::Ok(End))
|
|
|
+ ready(TransResult::Ok((End, ())))
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -414,12 +415,16 @@ mod client_callback {
|
|
|
impl Unregistered for UnregisteredState {
|
|
|
actor_name!("callback_client");
|
|
|
|
|
|
+ type OnSendRegisterReturn = ();
|
|
|
type OnSendRegisterRegistered = RegisteredState;
|
|
|
- type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
|
|
|
+ type OnSendRegisterFut = Ready<TransResult<Self, (Self::OnSendRegisterRegistered, ())>>;
|
|
|
fn on_send_register(self) -> Self::OnSendRegisterFut {
|
|
|
- ready(TransResult::Ok(RegisteredState {
|
|
|
- sender: self.sender,
|
|
|
- }))
|
|
|
+ ready(TransResult::Ok((
|
|
|
+ RegisteredState {
|
|
|
+ sender: self.sender,
|
|
|
+ },
|
|
|
+ (),
|
|
|
+ )))
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -522,40 +527,52 @@ mod client_callback {
|
|
|
self,
|
|
|
to: ServiceAddr,
|
|
|
msg: Register,
|
|
|
- ) -> Result<ClientHandleManual<Init, NewState>> {
|
|
|
- {
|
|
|
- let mut guard = self.state.lock().await;
|
|
|
- let state = guard
|
|
|
- .take()
|
|
|
- .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
|
|
|
- let new_state = match state {
|
|
|
- ClientStateManual::Unregistered(state) => {
|
|
|
- match state.on_send_register().await {
|
|
|
- TransResult::Ok(new_state) => {
|
|
|
- let msg = ClientCallbackMsgs::Register(msg);
|
|
|
- self.runtime
|
|
|
- .send_service(to, self.name.clone(), msg)
|
|
|
- .await?;
|
|
|
- ClientStateManual::Registered(new_state)
|
|
|
- }
|
|
|
- TransResult::Abort { from, err } => {
|
|
|
- log::warn!(
|
|
|
- "Aborted transition from the {} state: {}",
|
|
|
- "Unregistered",
|
|
|
- err
|
|
|
- );
|
|
|
- ClientStateManual::Unregistered(from)
|
|
|
- }
|
|
|
- TransResult::Fatal { err } => {
|
|
|
- return Err(err);
|
|
|
- }
|
|
|
+ ) -> TransResult<
|
|
|
+ Self,
|
|
|
+ (
|
|
|
+ ClientHandleManual<Init, NewState>,
|
|
|
+ Init::OnSendRegisterReturn,
|
|
|
+ ),
|
|
|
+ > {
|
|
|
+ let mut guard = self.state.lock().await;
|
|
|
+ let state = guard
|
|
|
+ .take()
|
|
|
+ .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
|
|
|
+ match state {
|
|
|
+ ClientStateManual::Unregistered(state) => match state.on_send_register().await {
|
|
|
+ TransResult::Ok((new_state, return_var)) => {
|
|
|
+ let msg = ClientCallbackMsgs::Register(msg);
|
|
|
+ let result = self.runtime.send_service(to, self.name.clone(), msg).await;
|
|
|
+ if let Err(err) = result {
|
|
|
+ return TransResult::Fatal { err };
|
|
|
}
|
|
|
+ *guard = Some(ClientStateManual::Registered(new_state));
|
|
|
+ drop(guard);
|
|
|
+ TransResult::Ok((self.new_type(), return_var))
|
|
|
}
|
|
|
- state => state,
|
|
|
- };
|
|
|
- *guard = Some(new_state);
|
|
|
+ TransResult::Abort { from, err } => {
|
|
|
+ *guard = Some(ClientStateManual::Unregistered(from));
|
|
|
+ drop(guard);
|
|
|
+ return TransResult::Abort { from: self, err };
|
|
|
+ }
|
|
|
+ TransResult::Fatal { err } => {
|
|
|
+ return TransResult::Fatal { err };
|
|
|
+ }
|
|
|
+ },
|
|
|
+ state => {
|
|
|
+ let name = state.name();
|
|
|
+ *guard = Some(state);
|
|
|
+ drop(guard);
|
|
|
+ TransResult::Abort {
|
|
|
+ from: self,
|
|
|
+ err: bterr!(
|
|
|
+ "Unexpected state '{}' for '{}' method.",
|
|
|
+ name,
|
|
|
+ "send_register"
|
|
|
+ ),
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- Ok(self.new_type())
|
|
|
}
|
|
|
}
|
|
|
|