diff --git a/src/core/player.rs b/src/core/player.rs index cf1969e..8d8f10b 100644 --- a/src/core/player.rs +++ b/src/core/player.rs @@ -15,12 +15,11 @@ use std::{ use prometheus::{IntGauge, Registry as MetricsRegistry}; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, - sync::oneshot::{channel as oneshot, Sender as OneshotSender}, task::JoinHandle, }; use crate::{ - core::room::{RoomId, RoomInfo, RoomRegistry}, + core::room::{RoomHandle, RoomId, RoomInfo, RoomRegistry}, prelude::*, util::table::{AnonTable, Key as AnonKey}, }; @@ -52,7 +51,7 @@ pub struct PlayerConnection { player_handle: PlayerHandle, } impl PlayerConnection { - pub async fn send_message(&mut self, room_id: RoomId, body: String) { + pub async fn send_message(&mut self, room_id: RoomId, body: String) -> Result<()> { self.player_handle .send_message(room_id, self.connection_id.clone(), body) .await @@ -64,6 +63,19 @@ impl PlayerConnection { .await } + pub async fn change_topic(&mut self, room_id: RoomId, new_topic: ByteVec) -> Result<()> { + let (promise, deferred) = oneshot(); + let cmd = Cmd::ChangeTopic { + room_id, + new_topic, + promise, + }; + self.player_handle + .send(PlayerCommand::Cmd(cmd, self.connection_id.clone())) + .await; + Ok(deferred.await?) + } + pub async fn send(&self, command: PlayerCommand) { self.player_handle.send(command).await; } @@ -89,14 +101,20 @@ impl PlayerHandle { } } - pub async fn send_message(&self, room_id: RoomId, connection_id: ConnectionId, body: String) { - self.tx - .send(PlayerCommand::SendMessage { - room_id, - connection_id, - body, - }) - .await; + pub async fn send_message( + &self, + room_id: RoomId, + connection_id: ConnectionId, + body: String, + ) -> Result<()> { + let (promise, deferred) = oneshot(); + let cmd = Cmd::SendMessage { + room_id, + body, + promise, + }; + self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; + Ok(deferred.await?) } pub async fn join_room( @@ -105,17 +123,15 @@ impl PlayerHandle { connection_id: ConnectionId, ) -> Result { let (promise, deferred) = oneshot(); - self.tx - .send(PlayerCommand::JoinRoom { - room_id, - connection_id, - promise, - }) - .await; + let cmd = Cmd::JoinRoom { + room_id, + promise: promise, + }; + self.tx.send(PlayerCommand::Cmd(cmd, connection_id)).await; Ok(deferred.await?) } - pub async fn send(&self, command: PlayerCommand) { + async fn send(&self, command: PlayerCommand) { self.tx.send(command).await; } @@ -128,21 +144,30 @@ pub enum PlayerCommand { /** Commands from connections */ AddConnection { sender: Sender, - promise: OneshotSender, + promise: Promise, }, + Cmd(Cmd, ConnectionId), + /// Query - responds with a list of rooms the player is a member of. + GetRooms(Promise>), + /** Events from rooms */ + Update(Updates), +} + +pub enum Cmd { JoinRoom { room_id: RoomId, - connection_id: ConnectionId, promise: Promise, }, SendMessage { room_id: RoomId, - connection_id: ConnectionId, body: String, + promise: Promise<()>, + }, + ChangeTopic { + room_id: RoomId, + new_topic: ByteVec, + promise: Promise<()>, }, - GetRooms(Promise>), - /** Events from rooms */ - Update(Updates), } /// Player update event type which is sent to a player actor and from there to a connection handler. @@ -154,14 +179,12 @@ pub enum Updates { }, NewMessage { room_id: RoomId, - connection_id: ConnectionId, author_id: PlayerId, body: String, }, RoomJoined { room_id: RoomId, new_member_id: PlayerId, - connection_id: ConnectionId, }, } @@ -225,7 +248,7 @@ impl Player { let (tx, mut rx) = channel(32); let handle = PlayerHandle { tx }; let handle_clone = handle.clone(); - let mut my_rooms = HashMap::new(); + let mut my_rooms: HashMap = HashMap::new(); let fiber = tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { match cmd { @@ -233,38 +256,6 @@ impl Player { let connection_id = self.connections.insert(sender); promise.send(ConnectionId(connection_id)); } - PlayerCommand::JoinRoom { - room_id, - connection_id, - promise, - } => { - let mut room = rooms.get_or_create_room(room_id.clone()); - room.subscribe(player_id.clone(), connection_id, handle.clone()) - .await; - my_rooms.insert(room_id.clone(), room.clone()); - let members = room.get_members().await; - promise.send(RoomInfo { - id: room_id, - members, - topic: b"some topic lol".to_vec(), - }); - } - PlayerCommand::SendMessage { - room_id, - connection_id, - body, - } => { - let room = rooms.get_room(&room_id); - match room { - Some(mut room) => { - room.send_message(player_id.clone(), connection_id, body) - .await; - } - None => { - tracing::info!("no room found"); - } - } - } PlayerCommand::GetRooms(promise) => { let mut response = vec![]; for (_, handle) in &my_rooms { @@ -277,10 +268,77 @@ impl Player { "Player received an update, broadcasting to {} connections", self.connections.len() ); - for connection in &self.connections { + for (_, connection) in &self.connections { connection.send(update.clone()).await; } } + PlayerCommand::Cmd(cmd, connection_id) => match cmd { + Cmd::JoinRoom { room_id, promise } => { + let mut room = rooms.get_or_create_room(room_id.clone()); + room.subscribe(player_id.clone(), handle.clone()).await; + my_rooms.insert(room_id.clone(), room.clone()); + let members = room.get_members().await; + promise.send(RoomInfo { + id: room_id.clone(), + members, + topic: b"some topic lol".to_vec(), + }); + let update = Updates::RoomJoined { + room_id, + new_member_id: player_id.clone(), + }; + for (a, b) in &self.connections { + if ConnectionId(a) == connection_id { + continue; + } + b.send(update.clone()).await; + } + } + Cmd::SendMessage { + room_id, + body, + promise, + } => { + let room = rooms.get_room(&room_id); + if let Some(room) = room { + room.send_message(player_id.clone(), body.clone()).await; + } else { + tracing::info!("no room found"); + } + promise.send(()); + let update = Updates::NewMessage { + room_id, + author_id: player_id.clone(), + body, + }; + for (a, b) in &self.connections { + if ConnectionId(a) == connection_id { + continue; + } + b.send(update.clone()).await; + } + } + Cmd::ChangeTopic { + room_id, + new_topic, + promise, + } => { + let room = rooms.get_room(&room_id); + if let Some(mut room) = room { + room.set_topic(player_id.clone(), new_topic.clone()).await; + } else { + tracing::info!("no room found"); + } + promise.send(()); + let update = Updates::RoomTopicChanged { room_id, new_topic }; + for (a, b) in &self.connections { + if ConnectionId(a) == connection_id { + continue; + } + b.send(update.clone()).await; + } + } + }, } } self diff --git a/src/core/room.rs b/src/core/room.rs index 9b9caca..cccb129 100644 --- a/src/core/room.rs +++ b/src/core/room.rs @@ -9,12 +9,10 @@ use prometheus::{IntGauge, Registry as MetricRegistry}; use tokio::sync::RwLock as AsyncRwLock; use crate::{ - core::player::{PlayerHandle, PlayerId}, + core::player::{PlayerHandle, PlayerId, Updates}, prelude::*, }; -use super::player::{ConnectionId, Updates}; - /// Opaque room id #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RoomId(ByteVec); @@ -82,25 +80,14 @@ struct RoomRegistryInner { #[derive(Clone)] pub struct RoomHandle(Arc>); impl RoomHandle { - pub async fn subscribe( - &self, - player_id: PlayerId, - connection_id: ConnectionId, - player_handle: PlayerHandle, - ) { + pub async fn subscribe(&self, player_id: PlayerId, player_handle: PlayerHandle) { let mut lock = self.0.write().await; - lock.add_subscriber(player_id, connection_id, player_handle) - .await; + lock.add_subscriber(player_id, player_handle).await; } - pub async fn send_message( - &self, - player_id: PlayerId, - connection_id: ConnectionId, - body: String, - ) { + pub async fn send_message(&self, player_id: PlayerId, body: String) { let lock = self.0.read().await; - lock.send_message(player_id, connection_id, body).await; + lock.send_message(player_id, body).await; } pub async fn get_members(&self) -> Vec { @@ -124,16 +111,14 @@ impl RoomHandle { } } - pub async fn set_topic(&mut self, new_topic: ByteVec) { + pub async fn set_topic(&mut self, changer_id: PlayerId, new_topic: ByteVec) { let mut lock = self.0.write().await; lock.topic = new_topic.clone(); - for (_, player_handle) in &lock.subscriptions { - let update = Updates::RoomTopicChanged { - room_id: lock.room_id.clone(), - new_topic: new_topic.clone(), - }; - player_handle.update(update.clone()).await; - } + let update = Updates::RoomTopicChanged { + room_id: lock.room_id.clone(), + new_topic: new_topic.clone(), + }; + lock.broadcast_update(update, &changer_id).await; } } @@ -143,33 +128,31 @@ struct Room { topic: ByteVec, } impl Room { - async fn add_subscriber( - &mut self, - player_id: PlayerId, - connection_id: ConnectionId, - player_handle: PlayerHandle, - ) { + async fn add_subscriber(&mut self, player_id: PlayerId, player_handle: PlayerHandle) { tracing::info!("Adding a subscriber to room"); self.subscriptions.insert(player_id.clone(), player_handle); let update = Updates::RoomJoined { room_id: self.room_id.clone(), new_member_id: player_id.clone(), - connection_id: connection_id.clone(), }; - for (_, sub) in &self.subscriptions { - sub.update(update.clone()).await; - } + self.broadcast_update(update, &player_id).await; } - async fn send_message(&self, author_id: PlayerId, connection_id: ConnectionId, body: String) { + async fn send_message(&self, author_id: PlayerId, body: String) { tracing::info!("Adding a message to room"); let update = Updates::NewMessage { room_id: self.room_id.clone(), - connection_id, - author_id, + author_id: author_id.clone(), body, }; - for (_, sub) in &self.subscriptions { + self.broadcast_update(update, &author_id).await; + } + + async fn broadcast_update(&self, update: Updates, except: &PlayerId) { + for (player_id, sub) in &self.subscriptions { + if player_id == except { + continue; + } log::info!("Sending a message from room to player"); sub.update(update.clone()).await; } diff --git a/src/projections/irc/mod.rs b/src/projections/irc/mod.rs index ff24623..ce62fa7 100644 --- a/src/projections/irc/mod.rs +++ b/src/projections/irc/mod.rs @@ -232,12 +232,12 @@ async fn handle_registered_socket<'a>( } else { len }; - handle_incoming_message(&buffer[0..len], &config, &user, &rooms, &mut connection, writer).await?; + handle_incoming_message(&buffer[0..len], &config, &user, &rooms, &mut connection, &player_id, writer).await?; buffer.clear(); }, update = connection.receiver.recv() => { match update.unwrap() { - Updates::RoomJoined { new_member_id, connection_id, room_id } => { + Updates::RoomJoined { new_member_id, room_id } => { if player_id == new_member_id { if let Some(room) = rooms.get_room(&room_id) { let room_info = room.get_room_info().await; @@ -256,15 +256,13 @@ async fn handle_registered_socket<'a>( writer.flush().await? } }, - Updates::NewMessage { author_id, connection_id, room_id, body } => { - if player_id != author_id || connection.connection_id != connection_id { - ServerMessage { - tags: vec![], - sender: Some(author_id.as_bytes().clone()), - body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_bytes().clone())), body: body.as_bytes().to_vec() } - }.write_async(writer).await?; - writer.flush().await? - } + Updates::NewMessage { author_id, room_id, body } => { + ServerMessage { + tags: vec![], + sender: Some(author_id.as_bytes().clone()), + body: ServerMessageBody::PrivateMessage { target: Recipient::Chan(Chan::Global(room_id.as_bytes().clone())), body: body.as_bytes().to_vec() } + }.write_async(writer).await?; + writer.flush().await? }, Updates::RoomTopicChanged { room_id, new_topic } => { ServerMessage { @@ -291,6 +289,7 @@ async fn handle_incoming_message( user: &RegisteredUser, rooms: &RoomRegistry, user_handle: &mut PlayerConnection, + player_id: &PlayerId, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { let parsed = client_message(buffer); @@ -316,7 +315,7 @@ async fn handle_incoming_message( Recipient::Chan(Chan::Global(chan)) => match String::from_utf8(body) { Ok(body) => { let room_id = RoomId::from_bytes(chan)?; - user_handle.send_message(room_id, body.clone()).await + user_handle.send_message(room_id, body.clone()).await?; } Err(err) => log::warn!("failed to parse incoming message: {err}"), }, @@ -326,14 +325,25 @@ async fn handle_incoming_message( match chan { Chan::Global(chan) => { let room_id = RoomId::from_bytes(chan)?; - let room = rooms.get_room(&room_id); - if let Some(mut room) = room { - room.set_topic(topic).await; + user_handle + .change_topic(room_id.clone(), topic.clone()) + .await?; + ServerMessage { + tags: vec![], + sender: Some(config.server_name.as_bytes().to_vec()), + body: ServerMessageBody::N332Topic { + client: user.nickname.clone(), + chat: Chan::Global(room_id.as_bytes().clone()), + topic, + }, } + .write_async(writer) + .await?; + writer.flush().await?; } Chan::Local(_) => {} }; - }, + } cmd => { log::warn!("Not implemented handler for client command: {cmd:?}"); } @@ -353,9 +363,11 @@ async fn handle_join( writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { match chan { - Chan::Global(chan) => { - let room_id = RoomId::from_bytes(chan.clone())?; + Chan::Global(chan_name) => { + let room_id = RoomId::from_bytes(chan_name.clone())?; let room_info = user_handle.join_room(room_id).await?; + produce_on_join_cmd_messages(&config, &user, chan, &room_info, writer).await?; + writer.flush().await?; } Chan::Local(_) => {} }; diff --git a/src/util/table.rs b/src/util/table.rs index c2bd0cc..2d93866 100644 --- a/src/util/table.rs +++ b/src/util/table.rs @@ -44,15 +44,15 @@ impl AnonTable { pub struct AnonTableIterator<'a, V>(<&'a HashMap as IntoIterator>::IntoIter); impl<'a, V> Iterator for AnonTableIterator<'a, V> { - type Item = &'a V; + type Item = (Key, &'a V); - fn next(&mut self) -> Option<&'a V> { - self.0.next().map(|a| a.1) + fn next(&mut self) -> Option<(Key, &'a V)> { + self.0.next().map(|(k, v)| (Key(*k), v)) } } impl<'a, V> IntoIterator for &'a AnonTable { - type Item = &'a V; + type Item = (Key, &'a V); type IntoIter = AnonTableIterator<'a, V>; @@ -63,15 +63,15 @@ impl<'a, V> IntoIterator for &'a AnonTable { pub struct AnonTableMutIterator<'a, V>(<&'a mut HashMap as IntoIterator>::IntoIter); impl<'a, V> Iterator for AnonTableMutIterator<'a, V> { - type Item = &'a mut V; + type Item = (Key, &'a mut V); - fn next(&mut self) -> Option<&'a mut V> { - self.0.next().map(|a| a.1) + fn next(&mut self) -> Option<(Key, &'a mut V)> { + self.0.next().map(|(k, v)| (Key(*k), v)) } } impl<'a, V> IntoIterator for &'a mut AnonTable { - type Item = &'a mut V; + type Item = (Key, &'a mut V); type IntoIter = AnonTableMutIterator<'a, V>;