From b0789d545781677125a8c1c8f8c8f5229913949c Mon Sep 17 00:00:00 2001 From: Mikhail Date: Tue, 28 May 2024 20:31:02 +0200 Subject: [PATCH] Send messages on join --- crates/lavina-core/src/player.rs | 21 +++++-- crates/lavina-core/src/repo/room.rs | 1 - crates/lavina-core/src/room.rs | 4 +- crates/projection-irc/src/lib.rs | 83 +++++++++++++++++++------- crates/projection-xmpp/src/presence.rs | 2 +- crates/proto-irc/src/client.rs | 20 +++---- crates/proto-irc/src/lib.rs | 24 ++++---- crates/proto-irc/src/server.rs | 12 ++-- 8 files changed, 106 insertions(+), 61 deletions(-) diff --git a/crates/lavina-core/src/player.rs b/crates/lavina-core/src/player.rs index 14a50ca..a5db4b7 100644 --- a/crates/lavina-core/src/player.rs +++ b/crates/lavina-core/src/player.rs @@ -112,9 +112,13 @@ impl PlayerConnection { } #[tracing::instrument(skip(self), name = "PlayerConnection::get_room_message_history")] - pub async fn get_room_message_history(&self, room_id: RoomId) -> Result> { + pub async fn get_room_message_history(&self, room_id: &RoomId, limit: u32) -> Result> { let (promise, deferred) = oneshot(); - let cmd = ClientCommand::GetRoomHistory { room_id, promise }; + let cmd = ClientCommand::GetRoomHistory { + room_id: room_id.clone(), + promise, + limit, + }; self.player_handle.send(ActorCommand::ClientCommand(cmd, self.connection_id.clone())).await; Ok(deferred.await?) } @@ -223,6 +227,7 @@ pub enum ClientCommand { GetRoomHistory { room_id: RoomId, promise: Promise>, + limit: u32, }, } @@ -522,8 +527,12 @@ impl Player { let result = self.check_user_existence(recipient).await; let _ = promise.send(result); } - ClientCommand::GetRoomHistory { room_id, promise } => { - let result = self.get_room_history(room_id).await; + ClientCommand::GetRoomHistory { + room_id, + promise, + limit, + } => { + let result = self.get_room_history(room_id, limit).await; let _ = promise.send(result); } } @@ -575,11 +584,11 @@ impl Player { } #[tracing::instrument(skip(self), name = "Player::retrieve_room_history")] - async fn get_room_history(&mut self, room_id: RoomId) -> Vec { + async fn get_room_history(&mut self, room_id: RoomId, limit: u32) -> Vec { let room = self.my_rooms.get(&room_id); if let Some(room) = room { match room { - RoomRef::Local(room) => room.get_message_history(&self.services).await, + RoomRef::Local(room) => room.get_message_history(&self.services, limit).await, RoomRef::Remote { node_id } => { todo!() } diff --git a/crates/lavina-core/src/repo/room.rs b/crates/lavina-core/src/repo/room.rs index 19ab90b..b60cb56 100644 --- a/crates/lavina-core/src/repo/room.rs +++ b/crates/lavina-core/src/repo/room.rs @@ -48,7 +48,6 @@ impl Storage { room_id = ? order by messages.id - -- limit ?; ", // todo: implement limit diff --git a/crates/lavina-core/src/room.rs b/crates/lavina-core/src/room.rs index 1b1ce30..3a4df49 100644 --- a/crates/lavina-core/src/room.rs +++ b/crates/lavina-core/src/room.rs @@ -160,8 +160,8 @@ impl RoomHandle { lock.broadcast_update(update, player_id).await; } - pub async fn get_message_history(&self, services: &LavinaCore) -> Vec { - return services.storage.get_room_message_history(self.0.read().await.storage_id).await.unwrap(); + pub async fn get_message_history(&self, services: &Services, limit: u32) -> Vec { + return services.storage.get_room_message_history(self.0.read().await.storage_id, limit).await.unwrap(); } #[tracing::instrument(skip(self), name = "RoomHandle::unsubscribe")] diff --git a/crates/projection-irc/src/lib.rs b/crates/projection-irc/src/lib.rs index 49c929d..3390b9d 100644 --- a/crates/projection-irc/src/lib.rs +++ b/crates/projection-irc/src/lib.rs @@ -25,7 +25,7 @@ use proto_irc::client::{client_message, ClientMessage}; use proto_irc::server::CapSubBody; use proto_irc::server::{AwayStatus, ServerMessage, ServerMessageBody}; use proto_irc::user::PrefixedNick; -use proto_irc::{Chan, Recipient, Tag}; +use proto_irc::{ChannelName, Recipient, Tag}; use sasl::AuthBody; mod cap; use handler::Handler; @@ -491,7 +491,14 @@ async fn handle_registered_socket<'a>( let rooms_list = connection.get_rooms().await?; for room in &rooms_list { - produce_on_join_cmd_messages(&config, &user, &Chan::Global(room.id.as_inner().clone()), room, writer).await?; + produce_on_join_cmd_messages( + &config, + &user, + &ChannelName::Global(room.id.as_inner().clone()), + room, + writer, + ) + .await?; } writer.flush().await?; @@ -576,7 +583,7 @@ async fn handle_update( if player_id == &new_member_id { if let Some(room) = core.get_room(&room_id).await { let room_info = room.get_room_info().await; - let chan = Chan::Global(room_id.as_inner().clone()); + let chan = ChannelName::Global(room_id.as_inner().clone()); produce_on_join_cmd_messages(&config, &user, &chan, &room_info, writer).await?; writer.flush().await?; } else { @@ -586,7 +593,7 @@ async fn handle_update( ServerMessage { tags: vec![], sender: Some(new_member_id.as_inner().clone()), - body: ServerMessageBody::Join(Chan::Global(room_id.as_inner().clone())), + body: ServerMessageBody::Join(ChannelName::Global(room_id.as_inner().clone())), } .write_async(writer) .await?; @@ -600,7 +607,7 @@ async fn handle_update( ServerMessage { tags: vec![], sender: Some(former_member_id.as_inner().clone()), - body: ServerMessageBody::Part(Chan::Global(room_id.as_inner().clone())), + body: ServerMessageBody::Part(ChannelName::Global(room_id.as_inner().clone())), } .write_async(writer) .await?; @@ -624,7 +631,7 @@ async fn handle_update( tags, sender: Some(author_id.as_inner().clone()), body: ServerMessageBody::PrivateMessage { - target: Recipient::Chan(Chan::Global(room_id.as_inner().clone())), + target: Recipient::Chan(ChannelName::Global(room_id.as_inner().clone())), body: body.clone(), }, } @@ -638,7 +645,7 @@ async fn handle_update( sender: Some(config.server_name.clone()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), - chat: Chan::Global(room_id.as_inner().clone()), + chat: ChannelName::Global(room_id.as_inner().clone()), topic: new_topic, }, } @@ -651,7 +658,7 @@ async fn handle_update( ServerMessage { tags: vec![], sender: Some(player_id.as_inner().clone()), - body: ServerMessageBody::Part(Chan::Global(room_id.as_inner().clone())), + body: ServerMessageBody::Part(ChannelName::Global(room_id.as_inner().clone())), } .write_async(writer) .await?; @@ -726,7 +733,7 @@ async fn handle_incoming_message( handle_part(config, user, user_handle, &chan, writer).await?; } ClientMessage::PrivateMessage { recipient, body } => match recipient { - Recipient::Chan(Chan::Global(chan)) => { + Recipient::Chan(ChannelName::Global(chan)) => { let room_id = RoomId::try_from(chan)?; user_handle.send_message(room_id, body).await?; } @@ -738,7 +745,7 @@ async fn handle_incoming_message( }, ClientMessage::Topic { chan, topic } => { match chan { - Chan::Global(chan) => { + ChannelName::Global(chan) => { let room_id = RoomId::try_from(chan)?; user_handle.change_topic(room_id.clone(), topic.clone()).await?; ServerMessage { @@ -746,7 +753,7 @@ async fn handle_incoming_message( sender: Some(config.server_name.clone()), body: ServerMessageBody::N332Topic { client: user.nickname.clone(), - chat: Chan::Global(room_id.as_inner().clone()), + chat: ChannelName::Global(room_id.as_inner().clone()), topic, }, } @@ -754,7 +761,7 @@ async fn handle_incoming_message( .await?; writer.flush().await?; } - Chan::Local(_) => {} + ChannelName::Local(_) => {} }; } ClientMessage::Who { target } => match &target { @@ -780,7 +787,7 @@ async fn handle_incoming_message( .await?; writer.flush().await?; } - Recipient::Chan(Chan::Global(chan)) => { + Recipient::Chan(ChannelName::Global(chan)) => { let room = core.get_room(&RoomId::try_from(chan.clone())?).await; if let Some(room) = room { let room_info = room.get_room_info().await; @@ -807,7 +814,7 @@ async fn handle_incoming_message( .await?; writer.flush().await?; } - Recipient::Chan(Chan::Local(_)) => { + Recipient::Chan(ChannelName::Local(_)) => { log::warn!("Local chans not supported"); } }, @@ -860,8 +867,38 @@ async fn handle_incoming_message( log::info!("Received QUIT"); return Ok(HandleResult::Leave); } - // todo: implement chat history logic here. - // ClientMessage:ChatHistory { ... } => {} + ClientMessage::ChatHistory { chan, limit } => { + let channel_name = match chan.clone() { + ChannelName::Global(chan) => chan, + ChannelName::Local(chan) => chan, + }; + let room = core.get_room(&RoomId::try_from(channel_name.clone())?).await; + if let Some(room) = room { + let room_info = room.get_room_info().await; + let messages = user_handle.get_room_message_history(&room_info.id, limit).await?; + for message in messages { + let mut tags = vec![]; + if user.enabled_capabilities.contains(Capabilities::ServerTime) { + let tag = Tag { + key: "time".into(), + value: Some(message.created_at.to_rfc3339_opts(SecondsFormat::Millis, true).into()), + }; + tags.push(tag); + } + ServerMessage { + tags, + sender: Some(message.author_name.into()), + body: ServerMessageBody::PrivateMessage { + target: Recipient::Chan(chan.clone()), + body: message.content.into(), + }, + } + .write_async(writer) + .await?; + } + writer.flush().await?; + } + } cmd => { log::warn!("Not implemented handler for client command: {cmd:?}"); } @@ -897,11 +934,11 @@ async fn handle_join( config: &ServerConfig, user: &RegisteredUser, user_handle: &mut PlayerConnection, - chan: &Chan, + chan: &ChannelName, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { match chan { - Chan::Global(chan_name) => { + ChannelName::Global(chan_name) => { let room_id = RoomId::try_from(chan_name.clone())?; match user_handle.join_room(room_id).await? { JoinResult::Success(room_info) => { @@ -926,7 +963,7 @@ async fn handle_join( } writer.flush().await?; } - Chan::Local(_) => { + ChannelName::Local(_) => { // TODO handle join attempts to local chans with an error, we don't support these } }; @@ -937,16 +974,16 @@ async fn handle_part( config: &ServerConfig, user: &RegisteredUser, user_handle: &mut PlayerConnection, - chan: &Chan, + chan: &ChannelName, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { - if let Chan::Global(chan_name) = chan { + if let ChannelName::Global(chan_name) = chan { let room_id = RoomId::try_from(chan_name.clone())?; user_handle.leave_room(room_id).await?; ServerMessage { tags: vec![], sender: Some(user.nickname.clone()), - body: ServerMessageBody::Part(Chan::Global(chan_name.clone())), + body: ServerMessageBody::Part(ChannelName::Global(chan_name.clone())), } .write_async(writer) .await?; @@ -960,7 +997,7 @@ async fn handle_part( async fn produce_on_join_cmd_messages( config: &ServerConfig, user: &RegisteredUser, - chan: &Chan, + chan: &ChannelName, room_info: &RoomInfo, writer: &mut (impl AsyncWrite + Unpin), ) -> Result<()> { diff --git a/crates/projection-xmpp/src/presence.rs b/crates/projection-xmpp/src/presence.rs index 9e8e2e0..c2a1cb3 100644 --- a/crates/projection-xmpp/src/presence.rs +++ b/crates/projection-xmpp/src/presence.rs @@ -179,7 +179,7 @@ impl<'a> XmppConnection<'a> { #[tracing::instrument(skip(self), name = "XmppConnection::retrieve_message_history")] async fn retrieve_message_history(&self, room_name: &Name) -> Result> { let room_id = RoomId::try_from(room_name.0.clone())?; - let history_messages = self.user_handle.get_room_message_history(room_id).await?; + let history_messages = self.user_handle.get_room_message_history(&room_id, 50).await?; let mut response = vec![]; for history_message in history_messages.into_iter() { diff --git a/crates/proto-irc/src/client.rs b/crates/proto-irc/src/client.rs index 294807d..328c0f5 100644 --- a/crates/proto-irc/src/client.rs +++ b/crates/proto-irc/src/client.rs @@ -33,7 +33,7 @@ pub enum ClientMessage { realname: Str, }, /// `JOIN ` - Join(Chan), + Join(ChannelName), /// `MODE ` Mode { target: Recipient, @@ -48,11 +48,11 @@ pub enum ClientMessage { }, /// `TOPIC :` Topic { - chan: Chan, + chan: ChannelName, topic: Str, }, Part { - chan: Chan, + chan: ChannelName, message: Option, }, /// `PRIVMSG :` @@ -65,8 +65,8 @@ pub enum ClientMessage { reason: Str, }, Authenticate(Str), - Chathistory { - chan: Chan, + ChatHistory { + chan: ChannelName, limit: u32, }, } @@ -294,7 +294,7 @@ fn client_message_chathistory(input: &str) -> IResult<&str, ClientMessage> { let (input, _) = tag(" * ")(input)?; let (input, limit) = limit(input)?; - Ok((input, ClientMessage::Chathistory { chan, limit })) + Ok((input, ClientMessage::ChatHistory { chan, limit })) } fn limit(input: &str) -> IResult<&str, u32> { @@ -504,7 +504,7 @@ mod test { fn test_client_message_part() { let input = "PART #chan :Pokasiki !!!"; let expected = ClientMessage::Part { - chan: Chan::Global("chan".into()), + chan: ChannelName::Global("chan".into()), message: Some("Pokasiki !!!".into()), }; @@ -516,7 +516,7 @@ mod test { fn test_client_message_part_empty() { let input = "PART #chan"; let expected = ClientMessage::Part { - chan: Chan::Global("chan".into()), + chan: ChannelName::Global("chan".into()), message: None, }; @@ -547,8 +547,8 @@ mod test { #[test] fn test_client_chat_history_latest() { let input = "CHATHISTORY LATEST #chan * 10"; - let expected = ClientMessage::Chathistory { - chan: Chan::Global("chan".into()), + let expected = ClientMessage::ChatHistory { + chan: ChannelName::Global("chan".into()), limit: 10, }; diff --git a/crates/proto-irc/src/lib.rs b/crates/proto-irc/src/lib.rs index b2c8a37..3e72861 100644 --- a/crates/proto-irc/src/lib.rs +++ b/crates/proto-irc/src/lib.rs @@ -48,20 +48,20 @@ fn params(input: &str) -> IResult<&str, &str> { } #[derive(Clone, Debug, PartialEq, Eq)] -pub enum Chan { +pub enum ChannelName { /// `#` — network-global channel, available from any server in the network. Global(Str), /// `&` — server-local channel, available only to connections to the same server. Rarely used in practice. Local(Str), } -impl Chan { +impl ChannelName { pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { match self { - Chan::Global(name) => { + ChannelName::Global(name) => { writer.write_all(b"#").await?; writer.write_all(name.as_bytes()).await?; } - Chan::Local(name) => { + ChannelName::Local(name) => { writer.write_all(b"&").await?; writer.write_all(name.as_bytes()).await?; } @@ -70,17 +70,17 @@ impl Chan { } } -fn chan(input: &str) -> IResult<&str, Chan> { - fn chan_global(input: &str) -> IResult<&str, Chan> { +fn chan(input: &str) -> IResult<&str, ChannelName> { + fn chan_global(input: &str) -> IResult<&str, ChannelName> { let (input, _) = tag("#")(input)?; let (input, name) = receiver(input)?; - Ok((input, Chan::Global(name.into()))) + Ok((input, ChannelName::Global(name.into()))) } - fn chan_local(input: &str) -> IResult<&str, Chan> { + fn chan_local(input: &str) -> IResult<&str, ChannelName> { let (input, _) = tag("&")(input)?; let (input, name) = receiver(input)?; - Ok((input, Chan::Local(name.into()))) + Ok((input, ChannelName::Local(name.into()))) } alt((chan_global, chan_local))(input) @@ -89,7 +89,7 @@ fn chan(input: &str) -> IResult<&str, Chan> { #[derive(Clone, Debug, PartialEq, Eq)] pub enum Recipient { Nick(Str), - Chan(Chan), + Chan(ChannelName), } impl Recipient { pub async fn write_async(&self, writer: &mut (impl AsyncWrite + Unpin)) -> std::io::Result<()> { @@ -125,7 +125,7 @@ mod test { #[test] fn test_chan_global() { let input = "#testchan"; - let expected = Chan::Global("testchan".into()); + let expected = ChannelName::Global("testchan".into()); let result = chan(input); assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); @@ -139,7 +139,7 @@ mod test { #[test] fn test_chan_local() { let input = "&localchan"; - let expected = Chan::Local("localchan".into()); + let expected = ChannelName::Local("localchan".into()); let result = chan(input); assert_matches!(result, Ok((_, result)) => assert_eq!(expected, result)); diff --git a/crates/proto-irc/src/server.rs b/crates/proto-irc/src/server.rs index ac50f72..03479eb 100644 --- a/crates/proto-irc/src/server.rs +++ b/crates/proto-irc/src/server.rs @@ -69,8 +69,8 @@ pub enum ServerMessageBody { target: Recipient, body: Str, }, - Join(Chan), - Part(Chan), + Join(ChannelName), + Part(ChannelName), Error { reason: Str, }, @@ -121,7 +121,7 @@ pub enum ServerMessageBody { }, N332Topic { client: Str, - chat: Chan, + chat: ChannelName, topic: Str, }, /// A reply to a client's [Who](crate::protos::irc::client::ClientMessage::Who) request. @@ -141,12 +141,12 @@ pub enum ServerMessageBody { }, N353NamesReply { client: Str, - chan: Chan, + chan: ChannelName, members: NonEmpty, }, N366NamesReplyEnd { client: Str, - chan: Chan, + chan: ChannelName, }, N431ErrNoNicknameGiven { client: Str, @@ -154,7 +154,7 @@ pub enum ServerMessageBody { }, N474BannedFromChan { client: Str, - chan: Chan, + chan: ChannelName, message: Str, }, N502UsersDontMatch {