Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions deltachat-rpc-client/tests/test_iroh_webxdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,7 @@ def test_realtime_large_webxdc(acfactory, path_to_large_webxdc):
ac1_ac2_chat = ac1.create_chat(ac2)
ac1_webxdc_msg = ac1_ac2_chat.send_message(text="realtime check", file=path_to_large_webxdc)

# Receive pre-message.
ac2_webxdc_msg = ac2.wait_for_incoming_msg()

# Receive post-message.
ac2_webxdc_msg = ac2.wait_for_msg(EventType.MSGS_CHANGED)

ac2_webxdc_msg.send_webxdc_realtime_advertisement()
event = ac1.wait_for_event(EventType.WEBXDC_REALTIME_ADVERTISEMENT_RECEIVED)
assert event.msg_id == ac1_webxdc_msg.id
10 changes: 0 additions & 10 deletions deltachat-rpc-client/tests/test_something.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,10 +873,6 @@ def test_delete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):

msg = bob.wait_for_incoming_msg()
msg_snapshot = msg.get_snapshot()
assert msg_snapshot.download_state == DownloadState.AVAILABLE
msgs_changed_event = bob.wait_for_msgs_changed_event()
assert msgs_changed_event.msg_id == msg.id
msg_snapshot = msg.get_snapshot()
assert msg_snapshot.download_state == DownloadState.DONE

bob_direct_imap = direct_imap(bob)
Expand Down Expand Up @@ -907,10 +903,6 @@ def test_imap_autodelete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):

msg = bob.wait_for_incoming_msg()
msg_snapshot = msg.get_snapshot()
assert msg_snapshot.download_state == DownloadState.AVAILABLE
msgs_changed_event = bob.wait_for_msgs_changed_event()
assert msgs_changed_event.msg_id == msg.id
msg_snapshot = msg.get_snapshot()
assert msg_snapshot.download_state == DownloadState.DONE

bob_direct_imap = direct_imap(bob)
Expand Down Expand Up @@ -1389,7 +1381,5 @@ def test_large_message(acfactory) -> None:
)

msg = bob.wait_for_incoming_msg()
msgs_changed_event = bob.wait_for_msgs_changed_event()
assert msg.id == msgs_changed_event.msg_id
snapshot = msg.get_snapshot()
assert snapshot.text == "Hello World, this message is bigger than 5 bytes"
22 changes: 11 additions & 11 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ SELECT id, rfc724_mid, pre_rfc724_mid, timestamp, ?, 1 FROM msgs WHERE chat_id=?
context.emit_event(EventType::ChatDeleted { chat_id: self });
context.emit_msgs_changed_without_ids();

context.scheduler.interrupt_inbox().await;
if let Some(id) = sync_id {
self::sync(context, id, SyncAction::Delete)
.await
Expand All @@ -703,8 +704,6 @@ SELECT id, rfc724_mid, pre_rfc724_mid, timestamp, ?, 1 FROM msgs WHERE chat_id=?
context
.set_config_internal(Config::LastHousekeeping, None)
.await?;
context.scheduler.interrupt_smtp().await;

Ok(())
}

Expand Down Expand Up @@ -2987,22 +2986,23 @@ WHERE id=?
)?;
for recipients_chunk in recipients.chunks(chunk_size) {
let recipients_chunk = recipients_chunk.join(" ");
if let Some(pre_msg) = &rendered_pre_msg {
let row_id = stmt.execute((
&pre_msg.rfc724_mid,
&recipients_chunk,
&pre_msg.message,
msg.id,
))?;
row_ids.push(row_id.try_into()?);
}
let row_id = stmt.execute((
&rendered_msg.rfc724_mid,
&recipients_chunk,
&rendered_msg.message,
msg.id,
))?;
row_ids.push(row_id.try_into()?);
let Some(pre_msg) = &rendered_pre_msg else {
continue;
};
let row_id = stmt.execute((
&pre_msg.rfc724_mid,
&recipients_chunk,
&pre_msg.message,
msg.id,
))?;
row_ids.push(row_id.try_into()?);
}
Ok(row_ids)
};
Expand Down
19 changes: 15 additions & 4 deletions src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::context::Context;
use crate::imap::session::Session;
use crate::log::warn;
use crate::message::{self, Message, MsgId, rfc724_mid_exists};
use crate::tools;
use crate::{EventType, chatlist_events, ephemeral};

pub(crate) mod post_msg_metadata;
Expand Down Expand Up @@ -323,12 +324,22 @@ pub(crate) async fn download_known_post_messages_without_pre_message(
context: &Context,
session: &mut Session,
) -> Result<()> {
const PRE_MSG_WAIT_TIME: i64 = 30;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compat problem: extra traffic for older versions

let now = tools::time();
let rfc724_mids = context
.sql
.query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| {
let rfc724_mid: String = row.get(0)?;
Ok(rfc724_mid)
})
.query_map_vec(
"
SELECT rfc724_mid FROM available_post_msgs
WHERE timestamp<=? OR timestamp>?
ORDER BY timestamp, rowid
",
(now.saturating_sub(PRE_MSG_WAIT_TIME), now),
|row| {
let rfc724_mid: String = row.get(0)?;
Ok(rfc724_mid)
},
)
.await?;
for rfc724_mid in &rfc724_mids {
if msg_is_downloaded_for(context, rfc724_mid).await? {
Expand Down
11 changes: 6 additions & 5 deletions src/imap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ impl Imap {
let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await;

let mut uids_fetch: Vec<u32> = Vec::new();
let mut available_post_msgs: Vec<String> = Vec::new();
let mut available_post_msgs: Vec<(String, i64)> = Vec::new();
let mut download_later: Vec<String> = Vec::new();
let mut uid_message_ids = BTreeMap::new();
let mut largest_uid_skipped = None;
Expand Down Expand Up @@ -696,7 +696,7 @@ impl Imap {
.is_some()
{
info!(context, "{message_id:?} is a post-message.");
available_post_msgs.push(message_id.clone());
available_post_msgs.push((message_id.clone(), time()));

let is_bot = context.get_config_bool(Config::Bot).await?;
if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit)
Expand Down Expand Up @@ -800,9 +800,10 @@ impl Imap {
download_later.len(),
);
let trans_fn = |t: &mut rusqlite::Transaction| {
let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
for rfc724_mid in available_post_msgs {
stmt.execute((rfc724_mid,))
let mut stmt =
t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?,?)")?;
for (rfc724_mid, ts) in available_post_msgs {
stmt.execute((rfc724_mid, ts))
.context("INSERT OR IGNORE INTO available_post_msgs")?;
}
let mut stmt =
Expand Down
30 changes: 13 additions & 17 deletions src/mimefactory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1723,23 +1723,19 @@ impl MimeFactory {

let footer = if is_reaction { "" } else { &self.selfstatus };

let message_text = if self.pre_message_mode == PreMessageMode::Post {
"".to_string()
} else {
format!(
"{}{}{}{}{}{}",
fwdhint.unwrap_or_default(),
quoted_text.unwrap_or_default(),
escape_message_footer_marks(final_text),
if !final_text.is_empty() && !footer.is_empty() {
"\r\n\r\n"
} else {
""
},
if !footer.is_empty() { "-- \r\n" } else { "" },
footer
)
};
let message_text = format!(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compat problem: duplicate text for ancient versions

"{}{}{}{}{}{}",
fwdhint.unwrap_or_default(),
quoted_text.unwrap_or_default(),
escape_message_footer_marks(final_text),
if !final_text.is_empty() && !footer.is_empty() {
"\r\n\r\n"
} else {
""
},
if !footer.is_empty() { "-- \r\n" } else { "" },
footer
);

let mut main_part = MimePart::new("text/plain", message_text);
if is_reaction {
Expand Down
51 changes: 29 additions & 22 deletions src/receive_imf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,18 +533,10 @@ pub(crate) async fn receive_imf_inner(
"Receiving message {rfc724_mid_orig:?}, seen={seen}...",
);

// These checks must be done before processing of SecureJoin and other special messages.
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
// Post-Message just replaces the attachment and modifies Params, not the whole message.
// This is done in the `handle_post_message` method.
} else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? {
info!(
context,
"Message {rfc724_mid} is already in some chat or deleted."
);
if mime_parser.incoming {
return Ok(None);
}
let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?;
if let Some(msg_id) = msg_id
&& !mime_parser.incoming
{
// For the case if we missed a successful SMTP response. Be optimistic that the message is
// delivered also.
let self_addr = context.get_primary_self_addr().await?;
Expand All @@ -559,6 +551,16 @@ pub(crate) async fn receive_imf_inner(
if !msg_has_pending_smtp_job(context, msg_id).await? {
msg_id.set_delivered(context).await?;
}
}
// These checks must be done before processing of SecureJoin and other special messages.
if mime_parser.pre_message == mimeparser::PreMessageMode::Post {
// Post-Message just replaces the attachment and modifies Params, not the whole message.
// This is done in the `update_from_post_msg` method.
} else if msg_id.is_some() {
info!(
context,
"Message {rfc724_mid} is already in some chat or deleted."
);
return Ok(None);
}

Expand Down Expand Up @@ -2080,7 +2082,7 @@ async fn add_parts(
}

handle_edit_delete(context, mime_parser, from_id).await?;
handle_post_message(context, mime_parser, from_id, state).await?;
update_from_post_msg(context, mime_parser, from_id, state).await?;

if mime_parser.is_system_message == SystemMessage::CallAccepted
|| mime_parser.is_system_message == SystemMessage::CallEnded
Expand Down Expand Up @@ -2274,8 +2276,7 @@ INSERT INTO msgs

// Maybe set logging xdc and add gossip topics for webxdcs.
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
if mime_parser.pre_message != PreMessageMode::Post
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compat problem: missing iroh topic id

&& part.typ == Viewtype::Webxdc
if part.typ == Viewtype::Webxdc
&& let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic)
{
let topic = iroh_topic_from_str(topic)?;
Expand Down Expand Up @@ -2424,7 +2425,7 @@ async fn handle_edit_delete(
Ok(())
}

async fn handle_post_message(
async fn update_from_post_msg(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to rename it to underline that only an already existing message is updated, to avoid more bugs caused by not fully handling post-messages

context: &Context,
mime_parser: &MimeMessage,
from_id: ContactId,
Expand All @@ -2442,15 +2443,15 @@ async fn handle_post_message(
let Some(msg_id) = message::rfc724_mid_exists(context, &rfc724_mid).await? else {
warn!(
context,
"handle_post_message: {rfc724_mid}: Database entry does not exist."
"update_from_post_msg: {rfc724_mid}: Database entry does not exist."
);
return Ok(());
};
let Some(original_msg) = Message::load_from_db_optional(context, msg_id).await? else {
// else: message is processed like a normal message
warn!(
context,
"handle_post_message: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message."
"update_from_post_msg: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message."
);
return Ok(());
};
Expand All @@ -2461,22 +2462,25 @@ async fn handle_post_message(
// Do nothing if safety checks fail, the worst case is the message modifies the chat if the
// sender is a member.
if from_id != original_msg.from_id {
warn!(context, "handle_post_message: {rfc724_mid}: Bad sender.");
warn!(context, "update_from_post_msg: {rfc724_mid}: Bad sender.");
return Ok(());
}
let post_msg_showpadlock = part
.param
.get_bool(Param::GuaranteeE2ee)
.unwrap_or_default();
if !post_msg_showpadlock && original_msg.get_showpadlock() {
warn!(context, "handle_post_message: {rfc724_mid}: Not encrypted.");
warn!(
context,
"update_from_post_msg: {rfc724_mid}: Not encrypted."
);
return Ok(());
}

if !part.typ.has_file() {
warn!(
context,
"handle_post_message: {rfc724_mid}: First mime part's message-viewtype has no file."
"update_from_post_msg: {rfc724_mid}: First mime part's message-viewtype has no file."
);
return Ok(());
}
Expand Down Expand Up @@ -2507,7 +2511,10 @@ WHERE id=?
part.typ,
part.bytes as isize,
part.error.as_deref().unwrap_or_default(),
state,
match mime_parser.incoming {
true => state,
false => MessageState::Undefined,
},
DownloadState::Done as u32,
original_msg.id,
),
Expand Down
22 changes: 11 additions & 11 deletions src/receive_imf/receive_imf_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5692,27 +5692,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<()
.await
.unwrap();

let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
assert_eq!(msg_id, pre_msg_id);
assert!(pre_msg_payload.len() < file_bytes.len());
let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
assert_eq!(msg_id, post_msg_id);
assert!(post_msg_payload.len() > file_bytes.len());

assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
// Alice receives her own pre-message because of bcc_self
// Alice receives her own post-message because of bcc_self
// This should not yet mark the message as delivered,
// because not everything was sent,
// but it does remove the pre-message from the SMTP queue
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
// but it does remove the post-message from the SMTP queue.
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);

let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await;
assert_eq!(msg_id, post_msg_id);
assert!(post_msg_payload.len() > file_bytes.len());
let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await;
assert_eq!(msg_id, pre_msg_id);
assert!(pre_msg_payload.len() < file_bytes.len());

assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending);
// Alice receives her own post-message because of bcc_self
// Alice receives her own pre-message because of bcc_self
// This should now mark the message as delivered,
// because everything was sent by now.
receive_imf(alice, post_msg_payload.as_bytes(), false).await?;
receive_imf(alice, pre_msg_payload.as_bytes(), false).await?;
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/smtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp(
if retries > 6 {
context
.sql
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
.execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,))
.await
.context("Failed to remove message with exceeded retry limit from smtp table")?;
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
Expand Down
13 changes: 13 additions & 0 deletions src/sql/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,19 @@ UPDATE msgs SET state=24 WHERE state=18; -- Change OutPreparing to OutFailed.
.await?;
}

inc_and_check(&mut migration_version, 154)?;
if dbversion < migration_version {
sql.execute_migration(
"
CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id);
ALTER TABLE available_post_msgs ADD COLUMN timestamp INTEGER DEFAULT 0 NOT NULL;
CREATE INDEX available_post_msgs_timestamp ON available_post_msgs (timestamp);
",
migration_version,
)
.await?;
}

let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?
Expand Down
Loading
Loading