diff --git a/tests/federation_room_join_test.go b/tests/federation_room_join_test.go index 03b12ee0..544bbef2 100644 --- a/tests/federation_room_join_test.go +++ b/tests/federation_room_join_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "strings" @@ -574,6 +575,131 @@ func TestSendJoinPartialStateResponse(t *testing.T) { must.HaveInOrder(t, sendJoinResp.ServersInRoom, []string{"hs1"}) } +// This test verifies that events sent into a room between a /make_join and +// /send_join are not lost to the joining server. When an event is created +// during the join handshake, the join event's prev_events (set at make_join +// time) won't reference it, creating two forward extremities. The server +// handling the join should ensure the joining server can discover the missed +// event, for example by sending a follow-up event that references both +// extremities, prompting the joining server to backfill. +// +// See https://github.com/element-hq/synapse/pull/19390 +func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { + deployment := complement.Deploy(t, 1) + defer deployment.Destroy(t) + + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) + + // We track the message event ID sent between make_join and send_join. + // After send_join, we wait for hs1 to send us either: + // - the message event itself, or + // - any event whose prev_events reference the message (e.g. a dummy event) + var messageEventID string + messageDiscoverableWaiter := helpers.NewWaiter() + + srv := federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + ) + srv.UnexpectedRequestsAreErrors = false + + // Custom /send handler: the Complement server won't be in the room until + // send_join completes, so we can't use HandleTransactionRequests (which + // requires the room in srv.rooms). Instead we parse the raw transaction. + srv.Mux().Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + body, _ := io.ReadAll(req.Body) + txn := gjson.ParseBytes(body) + txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool { + eventID := pdu.Get("event_id").String() + eventType := pdu.Get("type").String() + t.Logf("Received PDU via /send: type=%s id=%s", eventType, eventID) + + if messageEventID == "" { + return true + } + + // Check if this IS the message event (server pushed it directly). + if eventID == messageEventID { + messageDiscoverableWaiter.Finish() + return true + } + + // Check if this event's prev_events reference the message + // (e.g. a dummy event tying the forward extremities together). + pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool { + if prevEvent.String() == messageEventID { + messageDiscoverableWaiter.Finish() + return false + } + return true + }) + + return true + }) + w.WriteHeader(200) + w.Write([]byte(`{"pdus":{}}`)) + })).Methods("PUT") + + cancel := srv.Listen() + defer cancel() + + // Alice creates a room on hs1. + roomID := alice.MustCreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + }) + + charlie := srv.UserID("charlie") + origin := srv.ServerName() + fedClient := srv.FederationClient(deployment) + + // Step 1: make_join, hs1 returns a join event template whose prev_events + // reflect the current room DAG tips. + makeJoinResp, err := fedClient.MakeJoin( + context.Background(), origin, + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + roomID, charlie, + ) + must.NotError(t, "MakeJoin", err) + + // Step 2: Alice sends a message on hs1. This advances the DAG past the + // point captured by make_join's prev_events. The Complement server is not + // yet in the room, so it won't receive this event via normal federation. + messageEventID = alice.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Message sent between make_join and send_join", + }, + }) + t.Logf("Alice sent message %s between make_join and send_join", messageEventID) + + // Step 3: Build and sign the join event, then send_join. + // The join event's prev_events are from step 1 (before the message), + // so persisting it on hs1 creates two forward extremities: the message + // and the join. + verImpl, err := gomatrixserverlib.GetRoomVersion(makeJoinResp.RoomVersion) + must.NotError(t, "GetRoomVersion", err) + eb := verImpl.NewEventBuilderFromProtoEvent(&makeJoinResp.JoinEvent) + joinEvent, err := eb.Build(time.Now(), srv.ServerName(), srv.KeyID, srv.Priv) + must.NotError(t, "Build join event", err) + + _, err = fedClient.SendJoin( + context.Background(), origin, + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + joinEvent, + ) + must.NotError(t, "SendJoin", err) + + // Step 4: hs1 should make the missed message discoverable to the joining + // server. We accept either receiving the message event directly, or + // receiving any event whose prev_events reference it (allowing the + // joining server to backfill). + messageDiscoverableWaiter.Waitf(t, 5*time.Second, + "Timed out waiting for message event %s to become discoverable — "+ + "the event sent between make_join and send_join was lost to the "+ + "joining server", messageEventID, + ) +} + // given an event JSON, return the type and state_key, joined with a "|" func typeAndStateKeyForEvent(result gjson.Result) string { return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|")