add video channel

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou
2021-12-20 15:10:51 +08:00
parent 07ea52d620
commit 3ffd8ac146
4 changed files with 177 additions and 76 deletions

View File

@@ -26,6 +26,7 @@ lazy_static::lazy_static! {
pub struct ConnInner {
id: i32,
tx: Option<Sender>,
tx_video: Option<Sender>,
}
pub struct Connection {
@@ -65,6 +66,12 @@ impl Subscriber for ConnInner {
allow_err!(tx.send((Instant::now(), msg)));
});
}
fn send_video_frame(&mut self, tm: std::time::Instant, msg: Arc<Message>) {
self.tx_video.as_mut().map(|tx| {
allow_err!(tx.send((tm.into(), msg)));
});
}
}
const TEST_DELAY_TIMEOUT: Duration = Duration::from_secs(3);
@@ -87,8 +94,13 @@ impl Connection {
let (tx_from_cm, mut rx_from_cm) = mpsc::unbounded_channel::<ipc::Data>();
let (tx_to_cm, rx_to_cm) = mpsc::unbounded_channel::<ipc::Data>();
let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
let (tx_video, mut rx_video) = mpsc::unbounded_channel::<(Instant, Arc<Message>)>();
let mut conn = Self {
inner: ConnInner { id, tx: Some(tx) },
inner: ConnInner {
id,
tx: Some(tx),
tx_video: Some(tx_video),
},
stream,
server,
hash,
@@ -131,8 +143,11 @@ impl Connection {
let mut test_delay_timer =
time::interval_at(Instant::now() + TEST_DELAY_TIMEOUT, TEST_DELAY_TIMEOUT);
let mut last_recv_time = Instant::now();
loop {
tokio::select! {
biased;
Some(data) = rx_from_cm.recv() => {
match data {
ipc::Data::Authorize => {
@@ -193,26 +208,6 @@ impl Connection {
}
_ => {}
}
}
Some((instant, value)) = rx.recv() => {
let latency = instant.elapsed().as_millis() as i64;
super::video_service::update_internal_latency(id, latency);
let msg: &Message = &value;
if latency > 1000 {
match &msg.union {
Some(message::Union::video_frame(_)) => {
continue;
}
Some(message::Union::audio_frame(_)) => {
continue;
}
_ => {}
}
}
if let Err(err) = conn.stream.send(msg).await {
conn.on_close(&err.to_string(), false);
break;
}
},
res = conn.stream.next() => {
if let Some(res) = res {
@@ -244,7 +239,32 @@ impl Connection {
} else {
conn.timer = time::interval_at(Instant::now() + SEC30, SEC30);
}
}
},
Some((instant, value)) = rx_video.recv() => {
video_service::notify_video_frame_feched(id, Some(instant.into()));
if let Err(err) = conn.stream.send(&value as &Message).await {
conn.on_close(&err.to_string(), false);
break;
}
},
Some((instant, value)) = rx.recv() => {
let latency = instant.elapsed().as_millis() as i64;
let msg: &Message = &value;
if latency > 1000 {
match &msg.union {
Some(message::Union::audio_frame(_)) => {
log::info!("audio frame latency {}", instant.elapsed().as_secs_f32());
continue;
}
_ => {}
}
}
if let Err(err) = conn.stream.send(msg).await {
conn.on_close(&err.to_string(), false);
break;
}
},
_ = test_delay_timer.tick() => {
if last_recv_time.elapsed() >= SEC30 {
conn.on_close("Timeout", true);
@@ -263,7 +283,8 @@ impl Connection {
}
}
}
super::video_service::update_internal_latency(id, 0);
video_service::notify_video_frame_feched(id, None);
super::video_service::update_test_latency(id, 0);
super::video_service::update_image_quality(id, None);
if let Some(forward) = conn.port_forward_socket.as_mut() {