diff --git a/src/chat.rs b/src/chat.rs index 11197d2616..1522efc916 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1,7 +1,7 @@ //! # Chat module. use std::cmp; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt; use std::io::Cursor; use std::marker::Sync; @@ -3325,39 +3325,41 @@ pub(crate) async fn mark_old_messages_as_noticed( return Ok(()); } - let mut msgs_by_chat: HashMap = HashMap::new(); + let mut updated_chats = BTreeMap::new(); for msg in msgs { - let chat_id = msg.chat_id; - if let Some(existing_msg) = msgs_by_chat.get(&chat_id) { - if msg.sort_timestamp > existing_msg.sort_timestamp { - msgs_by_chat.insert(chat_id, msg); - } - } else { - msgs_by_chat.insert(chat_id, msg); - } + let Some(&msg_id) = msg.msg_ids.last() else { + continue; + }; + updated_chats + .entry(msg.chat_id) + .and_modify(|val| *val = cmp::max(*val, (msg.sort_timestamp, msg_id))) + .or_insert((msg.sort_timestamp, msg_id)); } let changed_chats = context .sql .transaction(|transaction| { let mut changed_chats = Vec::new(); - for (_, msg) in msgs_by_chat { + for (chat_id, (timestamp, msg_id)) in updated_chats { let changed_rows = transaction.execute( + // Do the same as in receive_imf_inner(). "UPDATE msgs SET state=? WHERE state=? AND hidden=0 AND chat_id=? - AND timestamp<=?;", + AND timestamp<=? + AND id 0 { - changed_chats.push(msg.chat_id); + changed_chats.push(chat_id); } } Ok(changed_chats) diff --git a/src/receive_imf.rs b/src/receive_imf.rs index c1bc3e491d..18dccc1a26 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -982,12 +982,18 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1 context .sql .execute( + // Don't mark messages added to the db later as noticed, regardless of + // `timestamp` -- the device which issued the MDN might not have these + // messages at that moment. Still, additionally filter messages by timestamp + // to protect from multi-relay message reordering and overall rely less on + // the server side. We assume that all clocks in the chat are synchronized. " UPDATE msgs SET state=? WHERE state=? AND hidden=0 AND chat_id=? AND - (timestamp,id)<(?,?)", + timestamp<=? AND + id Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_self_mdn_vs_delayed_msg() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = &tcm.alice().await; + let bob1 = &tcm.bob().await; + let bob2 = &tcm.bob().await; + + let alice_chat = alice.create_chat(bob1).await; + let sent1 = alice.send_text(alice_chat.id, "1").await; + SystemTime::shift(Duration::from_secs(1)); + let sent2 = alice.send_text(alice_chat.id, "2").await; + + let msg2_id = bob1.recv_msg(&sent2).await.id; + let msg1_id = bob1.recv_msg(&sent1).await.id; + let bob2_chat_id = bob2.recv_msg(&sent2).await.chat_id; + bob2.recv_msg(&sent1).await; + + message::markseen_msgs(bob1, vec![msg2_id]).await?; + assert_eq!(msg1_id.get_state(bob1).await?, MessageState::InFresh); + smtp::queue_mdn(bob1).await?; + let sent = bob1.pop_sent_msg().await; + + bob2.recv_msg_trash(&sent).await; + let mut msgs = get_chat_msgs(bob2, bob2_chat_id).await?; + let ChatItem::Message { msg_id } = msgs.pop().unwrap() else { + unreachable!(); + }; + let msg = Message::load_from_db(bob2, msg_id).await?; + assert_eq!(msg.text, "2"); + assert_eq!(msg.state, MessageState::InSeen); + let ChatItem::Message { msg_id } = msgs.pop().unwrap() else { + unreachable!(); + }; + let msg = Message::load_from_db(bob2, msg_id).await?; + assert_eq!(msg.text, "1"); + assert_eq!(msg.state, MessageState::InFresh); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_gmx_forwarded_msg() -> Result<()> { let t = TestContext::new_alice().await; diff --git a/src/smtp.rs b/src/smtp.rs index 020c575fd3..76deb73f87 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -13,7 +13,7 @@ use crate::config::Config; use crate::contact::{Contact, ContactId}; use crate::context::Context; use crate::events::EventType; -use crate::log::{LogExt, warn}; +use crate::log::warn; use crate::message::Message; use crate::message::{self, MsgId}; use crate::mimefactory::MimeFactory; @@ -590,44 +590,77 @@ async fn send_mdn_rfc724_mid( if context.get_config_bool(Config::BccSelf).await? { add_self_recipients(context, &mut recipients, encrypted).await?; } - let recipients: Vec<_> = recipients - .into_iter() - .filter_map(|addr| { - async_smtp::EmailAddress::new(addr.clone()) - .with_context(|| format!("Invalid recipient: {addr}")) - .log_err(context) - .ok() - }) - .collect(); - - match smtp_send(context, &recipients, &body, smtp, None).await { - SendResult::Success => { - if !recipients.is_empty() { - info!(context, "Successfully sent MDN for {rfc724_mid}."); + #[cfg(not(test))] + { + use crate::log::LogExt; + + let recipients: Vec<_> = recipients + .into_iter() + .filter_map(|addr| { + async_smtp::EmailAddress::new(addr.clone()) + .with_context(|| format!("Invalid recipient: {addr}")) + .log_err(context) + .ok() + }) + .collect(); + + match smtp_send(context, &recipients, &body, smtp, None).await { + SendResult::Success => { + if !recipients.is_empty() { + info!(context, "Successfully sent MDN for {rfc724_mid}."); + } + context + .sql + .transaction(|transaction| { + let mut stmt = + transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; + stmt.execute((rfc724_mid,))?; + for additional_rfc724_mid in additional_rfc724_mids { + stmt.execute((additional_rfc724_mid,))?; + } + Ok(()) + }) + .await?; + Ok(true) } - context - .sql - .transaction(|transaction| { - let mut stmt = - transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; - stmt.execute((rfc724_mid,))?; - for additional_rfc724_mid in additional_rfc724_mids { - stmt.execute((additional_rfc724_mid,))?; - } - Ok(()) - }) - .await?; - Ok(true) - } - SendResult::Retry => { - info!( - context, - "Temporary SMTP failure while sending an MDN for {rfc724_mid}." - ); - Ok(false) + SendResult::Retry => { + info!( + context, + "Temporary SMTP failure while sending an MDN for {rfc724_mid}." + ); + Ok(false) + } + SendResult::Failure(err) => Err(err), } - SendResult::Failure(err) => Err(err), } + #[cfg(test)] + { + let _ = smtp; + context + .sql + .transaction(|t| { + t.execute( + "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) + VALUES (?, ?, ?, ?)", + (rfc724_mid, recipients.join(" "), body, u32::MAX), + )?; + let mut stmt = t.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; + stmt.execute((rfc724_mid,))?; + for additional_rfc724_mid in additional_rfc724_mids { + stmt.execute((additional_rfc724_mid,))?; + } + Ok(()) + }) + .await?; + Ok(true) + } +} + +#[cfg(test)] +pub(crate) async fn queue_mdn(context: &Context) -> Result<()> { + let queued = send_mdn(context, &mut Smtp::new()).await?; + assert!(queued); + Ok(()) } /// Tries to send a single MDN. Returns true if more MDNs should be sent.