Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions src/chat.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! # Chat module.

use std::cmp;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::io::Cursor;
use std::marker::Sync;
Expand Down Expand Up @@ -3325,39 +3325,41 @@ pub(crate) async fn mark_old_messages_as_noticed(
return Ok(());
}

let mut msgs_by_chat: HashMap<ChatId, ReceivedMsg> = HashMap::new();
let mut updated_chats = BTreeMap::new();
for msg in msgs {
let chat_id = msg.chat_id;
if let Some(existing_msg) = msgs_by_chat.get(&chat_id) {
if msg.sort_timestamp > existing_msg.sort_timestamp {
msgs_by_chat.insert(chat_id, msg);
}
} else {
msgs_by_chat.insert(chat_id, msg);
}
let Some(&msg_id) = msg.msg_ids.last() else {
continue;
};
updated_chats
.entry(msg.chat_id)
.and_modify(|val| *val = cmp::max(*val, (msg.sort_timestamp, msg_id)))
.or_insert((msg.sort_timestamp, msg_id));
}

let changed_chats = context
.sql
.transaction(|transaction| {
let mut changed_chats = Vec::new();
for (_, msg) in msgs_by_chat {
for (chat_id, (timestamp, msg_id)) in updated_chats {
let changed_rows = transaction.execute(
// Do the same as in receive_imf_inner().
"UPDATE msgs
SET state=?
WHERE state=?
AND hidden=0
AND chat_id=?
AND timestamp<=?;",
AND timestamp<=?
AND id<?",
(
MessageState::InNoticed,
MessageState::InFresh,
msg.chat_id,
msg.sort_timestamp,
chat_id,
timestamp,
msg_id,
),
)?;
if changed_rows > 0 {
changed_chats.push(msg.chat_id);
changed_chats.push(chat_id);
}
}
Ok(changed_chats)
Expand Down
8 changes: 7 additions & 1 deletion src/receive_imf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,12 +982,18 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1
context
.sql
.execute(
// Don't mark messages added to the db later as noticed, regardless of
// `timestamp` -- the device which issued the MDN might not have these
// messages at that moment. Still, additionally filter messages by timestamp
// to protect from multi-relay message reordering and overall rely less on
// the server side. We assume that all clocks in the chat are synchronized.
"
UPDATE msgs SET state=? WHERE
state=? AND
hidden=0 AND
chat_id=? AND
(timestamp,id)<(?,?)",
timestamp<=? AND
id<?",
(
MessageState::InNoticed,
MessageState::InFresh,
Expand Down
40 changes: 40 additions & 0 deletions src/receive_imf/receive_imf_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::imap::prefetch_should_download;
use crate::imex::{ImexMode, imex};
use crate::key;
use crate::securejoin::get_securejoin_qr;
use crate::smtp;
use crate::test_utils;
use crate::test_utils::{
TestContext, TestContextManager, alice_keypair, get_chat_msg, mark_as_verified,
Expand Down Expand Up @@ -2724,6 +2725,45 @@ async fn test_read_receipts_dont_unmark_bots() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_self_mdn_vs_delayed_msg() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = &tcm.alice().await;
let bob1 = &tcm.bob().await;
let bob2 = &tcm.bob().await;

let alice_chat = alice.create_chat(bob1).await;
let sent1 = alice.send_text(alice_chat.id, "1").await;
SystemTime::shift(Duration::from_secs(1));
let sent2 = alice.send_text(alice_chat.id, "2").await;

let msg2_id = bob1.recv_msg(&sent2).await.id;
let msg1_id = bob1.recv_msg(&sent1).await.id;
let bob2_chat_id = bob2.recv_msg(&sent2).await.chat_id;
bob2.recv_msg(&sent1).await;

message::markseen_msgs(bob1, vec![msg2_id]).await?;
assert_eq!(msg1_id.get_state(bob1).await?, MessageState::InFresh);
smtp::queue_mdn(bob1).await?;
let sent = bob1.pop_sent_msg().await;

bob2.recv_msg_trash(&sent).await;
let mut msgs = get_chat_msgs(bob2, bob2_chat_id).await?;
let ChatItem::Message { msg_id } = msgs.pop().unwrap() else {
unreachable!();
};
let msg = Message::load_from_db(bob2, msg_id).await?;
assert_eq!(msg.text, "2");
assert_eq!(msg.state, MessageState::InSeen);
let ChatItem::Message { msg_id } = msgs.pop().unwrap() else {
unreachable!();
};
let msg = Message::load_from_db(bob2, msg_id).await?;
assert_eq!(msg.text, "1");
assert_eq!(msg.state, MessageState::InFresh);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_gmx_forwarded_msg() -> Result<()> {
let t = TestContext::new_alice().await;
Expand Down
105 changes: 69 additions & 36 deletions src/smtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::config::Config;
use crate::contact::{Contact, ContactId};
use crate::context::Context;
use crate::events::EventType;
use crate::log::{LogExt, warn};
use crate::log::warn;
use crate::message::Message;
use crate::message::{self, MsgId};
use crate::mimefactory::MimeFactory;
Expand Down Expand Up @@ -590,44 +590,77 @@ async fn send_mdn_rfc724_mid(
if context.get_config_bool(Config::BccSelf).await? {
add_self_recipients(context, &mut recipients, encrypted).await?;
}
let recipients: Vec<_> = recipients
.into_iter()
.filter_map(|addr| {
async_smtp::EmailAddress::new(addr.clone())
.with_context(|| format!("Invalid recipient: {addr}"))
.log_err(context)
.ok()
})
.collect();

match smtp_send(context, &recipients, &body, smtp, None).await {
SendResult::Success => {
if !recipients.is_empty() {
info!(context, "Successfully sent MDN for {rfc724_mid}.");
#[cfg(not(test))]
{
use crate::log::LogExt;

let recipients: Vec<_> = recipients
.into_iter()
.filter_map(|addr| {
async_smtp::EmailAddress::new(addr.clone())
.with_context(|| format!("Invalid recipient: {addr}"))
.log_err(context)
.ok()
})
.collect();

match smtp_send(context, &recipients, &body, smtp, None).await {
SendResult::Success => {
if !recipients.is_empty() {
info!(context, "Successfully sent MDN for {rfc724_mid}.");
}
context
.sql
.transaction(|transaction| {
let mut stmt =
transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.await?;
Ok(true)
}
context
.sql
.transaction(|transaction| {
let mut stmt =
transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.await?;
Ok(true)
}
SendResult::Retry => {
info!(
context,
"Temporary SMTP failure while sending an MDN for {rfc724_mid}."
);
Ok(false)
SendResult::Retry => {
info!(
context,
"Temporary SMTP failure while sending an MDN for {rfc724_mid}."
);
Ok(false)
}
SendResult::Failure(err) => Err(err),
}
SendResult::Failure(err) => Err(err),
}
#[cfg(test)]
{
let _ = smtp;
context
.sql
.transaction(|t| {
t.execute(
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id)
VALUES (?, ?, ?, ?)",
(rfc724_mid, recipients.join(" "), body, u32::MAX),
)?;
let mut stmt = t.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?;
stmt.execute((rfc724_mid,))?;
for additional_rfc724_mid in additional_rfc724_mids {
stmt.execute((additional_rfc724_mid,))?;
}
Ok(())
})
.await?;
Ok(true)
}
}

#[cfg(test)]
pub(crate) async fn queue_mdn(context: &Context) -> Result<()> {
let queued = send_mdn(context, &mut Smtp::new()).await?;
assert!(queued);
Ok(())
}

/// Tries to send a single MDN. Returns true if more MDNs should be sent.
Expand Down
Loading