fix peer rpc and ospf route

This commit is contained in:
sijie.sun 2024-03-21 23:33:19 +08:00 committed by Sijie.Sun
parent ba1795a113
commit a4af83e82d
4 changed files with 71 additions and 21 deletions

View File

@ -82,6 +82,8 @@ jobs:
fi
fi
# see https://github.com/rust-lang/rustup/issues/3709
rustup set auto-self-update disable
rustup install 1.75
rustup default 1.75
rustup target add $TARGET

View File

@ -71,7 +71,7 @@ pub enum CtrlPacketPayload {
RoutePacket(RoutePacket),
Ping(u32),
Pong(u32),
TaRpc(u32, bool, Vec<u8>), // u32: service_id, bool: is_req, Vec<u8>: rpc body
TaRpc(u32, u32, bool, Vec<u8>), // u32: service_id, u32: transact_id, bool: is_req, Vec<u8>: rpc body
}
impl CtrlPacketPayload {
@ -206,10 +206,11 @@ impl Packet {
from_peer: PeerId,
to_peer: PeerId,
service_id: u32,
transact_id: u32,
is_req: bool,
body: Vec<u8>,
) -> Self {
let ta_rpc = CtrlPacketPayload::TaRpc(service_id, is_req, body);
let ta_rpc = CtrlPacketPayload::TaRpc(service_id, transact_id, is_req, body);
Packet::new(
from_peer,
to_peer,

View File

@ -1294,6 +1294,12 @@ mod tests {
peer_mgr
}
fn check_rpc_counter(route: &Arc<PeerRoute>, peer_id: PeerId, max_tx: u32, max_rx: u32) {
let (tx1, rx1) = get_rpc_counter(route, peer_id);
assert!(tx1 <= max_tx);
assert!(rx1 <= max_rx);
}
#[tokio::test]
async fn ospf_route_2node() {
let p_a = create_mock_pmgr().await;
@ -1378,14 +1384,21 @@ mod tests {
}
connect_peer_manager(p_a.clone(), p_c.clone()).await;
for r in vec![r_a.clone(), r_b.clone(), r_c.clone()].iter() {
// for full-connected 3 nodes, the sessions between them will be a cycle
wait_for_condition(
|| async { r.service_impl.sessions.len() == 2 },
Duration::from_secs(3),
)
.await;
}
// for full-connected 3 nodes, the sessions between them may be a cycle or a line
wait_for_condition(
|| async {
let mut lens = vec![
r_a.service_impl.sessions.len(),
r_b.service_impl.sessions.len(),
r_c.service_impl.sessions.len(),
];
lens.sort();
lens == vec![1, 1, 2] || lens == vec![2, 2, 2]
},
Duration::from_secs(3),
)
.await;
let p_d = create_mock_pmgr().await;
let r_d = create_mock_route(p_d.clone()).await;
@ -1422,7 +1435,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(2)).await;
assert!([(2, 2), (1, 1)].contains(&get_rpc_counter(&r_e, last_p.my_peer_id())));
check_rpc_counter(&r_e, last_p.my_peer_id(), 2, 2);
for r in all_route.iter() {
if r.my_peer_id != last_p.my_peer_id() {
@ -1537,7 +1550,7 @@ mod tests {
assert_eq!(1, r_b.list_routes().await.len());
assert!([(2, 2), (1, 1)].contains(&get_rpc_counter(&r_a, p_b.my_peer_id())));
check_rpc_counter(&r_a, p_b.my_peer_id(), 2, 2);
p_a.get_peer_map()
.close_peer(p_b.my_peer_id())
@ -1561,6 +1574,6 @@ mod tests {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("session: {:?}", r_a.session_mgr.dump_sessions());
assert!([(2, 2), (1, 1)].contains(&get_rpc_counter(&r_a, p_b.my_peer_id())));
check_rpc_counter(&r_a, p_b.my_peer_id(), 2, 2);
}
}

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{atomic::AtomicU32, Arc};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
@ -19,6 +19,7 @@ use crate::{
use super::packet::CtrlPacketPayload;
type PeerRpcServiceId = u32;
type PeerRpcTransactId = u32;
#[async_trait::async_trait]
#[auto_impl::auto_impl(Arc)]
@ -38,7 +39,7 @@ struct PeerRpcEndPoint {
type PeerRpcEndPointCreator = Box<dyn Fn(PeerId) -> PeerRpcEndPoint + Send + Sync + 'static>;
#[derive(Hash, Eq, PartialEq, Clone)]
struct PeerRpcClientCtxKey(PeerId, PeerRpcServiceId);
struct PeerRpcClientCtxKey(PeerId, PeerRpcServiceId, PeerRpcTransactId);
// handle rpc request from one peer
pub struct PeerRpcManager {
@ -50,6 +51,8 @@ pub struct PeerRpcManager {
peer_rpc_endpoints: Arc<DashMap<(PeerId, PeerRpcServiceId), PeerRpcEndPoint>>,
client_resp_receivers: Arc<DashMap<PeerRpcClientCtxKey, PacketSender>>,
transact_id: AtomicU32,
}
impl std::fmt::Debug for PeerRpcManager {
@ -65,6 +68,7 @@ struct TaRpcPacketInfo {
from_peer: PeerId,
to_peer: PeerId,
service_id: PeerRpcServiceId,
transact_id: PeerRpcTransactId,
is_req: bool,
content: Vec<u8>,
}
@ -80,6 +84,8 @@ impl PeerRpcManager {
peer_rpc_endpoints: Arc::new(DashMap::new()),
client_resp_receivers: Arc::new(DashMap::new()),
transact_id: AtomicU32::new(0),
}
}
@ -107,6 +113,7 @@ impl PeerRpcManager {
let tspt = tspt.clone();
tasks.spawn(async move {
let mut cur_req_peer_id = None;
let mut cur_transact_id = 0;
loop {
tokio::select! {
Some(resp) = client_transport.next() => {
@ -133,6 +140,7 @@ impl PeerRpcManager {
tspt.my_peer_id(),
cur_req_peer_id,
service_id,
cur_transact_id,
false,
serialized_resp.unwrap(),
);
@ -161,6 +169,7 @@ impl PeerRpcManager {
assert_eq!(info.service_id, service_id);
cur_req_peer_id = Some(packet.from_peer.clone().into());
cur_transact_id = info.transact_id;
tracing::trace!("recv packet from peer, packet: {:?}", packet);
@ -213,10 +222,11 @@ impl PeerRpcManager {
fn parse_rpc_packet(packet: &Packet) -> Result<TaRpcPacketInfo, Error> {
let ctrl_packet_payload = CtrlPacketPayload::from_packet2(&packet);
match &ctrl_packet_payload {
CtrlPacketPayload::TaRpc(id, is_req, body) => Ok(TaRpcPacketInfo {
CtrlPacketPayload::TaRpc(id, tid, is_req, body) => Ok(TaRpcPacketInfo {
from_peer: packet.from_peer.into(),
to_peer: packet.to_peer.into(),
service_id: *id,
transact_id: *tid,
is_req: *is_req,
content: body.clone(),
}),
@ -257,9 +267,11 @@ impl PeerRpcManager {
endpoint.packet_sender.send(packet).unwrap();
} else {
if let Some(a) = client_resp_receivers
.get(&PeerRpcClientCtxKey(info.from_peer, info.service_id))
{
if let Some(a) = client_resp_receivers.get(&PeerRpcClientCtxKey(
info.from_peer,
info.service_id,
info.transact_id,
)) {
log::trace!("recv resp: {:?}", packet);
if let Err(e) = a.send(packet) {
tracing::error!(error = ?e, "send resp to client failed");
@ -291,6 +303,10 @@ impl PeerRpcManager {
let (mut server_s, mut server_r) = server_transport.split();
let transact_id = self
.transact_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tspt = self.tspt.clone();
tasks.spawn(async move {
while let Some(a) = server_r.next().await {
@ -309,6 +325,7 @@ impl PeerRpcManager {
tspt.my_peer_id(),
dst_peer_id,
service_id,
transact_id,
true,
a.unwrap(),
);
@ -345,11 +362,16 @@ impl PeerRpcManager {
tracing::warn!("[PEER RPC MGR] server packet read aborted");
});
let key = PeerRpcClientCtxKey(dst_peer_id, service_id, transact_id);
let _insert_ret = self
.client_resp_receivers
.insert(PeerRpcClientCtxKey(dst_peer_id, service_id), packet_sender);
.insert(key.clone(), packet_sender);
f(client_transport).await
let ret = f(client_transport).await;
self.client_resp_receivers.remove(&key);
ret
}
pub fn my_peer_id(&self) -> PeerId {
@ -486,6 +508,18 @@ mod tests {
println!("ip_list: {:?}", ip_list);
assert_eq!(ip_list.as_ref().unwrap(), "hello abc");
// call again
let ip_list = peer_mgr_a
.get_peer_rpc_mgr()
.do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async {
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "abcd".to_owned()).await;
ret
})
.await;
println!("ip_list: {:?}", ip_list);
assert_eq!(ip_list.as_ref().unwrap(), "hello abcd");
let ip_list = peer_mgr_c
.get_peer_rpc_mgr()
.do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async {