diff --git a/src/http-api.mjs b/src/http-api.mjs index ff6d77a..1c34575 100644 --- a/src/http-api.mjs +++ b/src/http-api.mjs @@ -370,7 +370,7 @@ HttpAPI.prototype.sendToDevice = async function (eventType, txnId = randomUUID() }).json() } -HttpAPI.prototype.sync = async function (since, filter, timeout = POLL_TIMEOUT) { +HttpAPI.prototype.sync = async function (since, filter, timeout = POLL_TIMEOUT, signal) { const buildSearchParams = (since, filter, timeout) => { const params = { timeout @@ -380,9 +380,9 @@ HttpAPI.prototype.sync = async function (since, filter, timeout = POLL_TIMEOUT) if (f) params.filter = f return params } - return this.client.get('v3/sync', { - searchParams: buildSearchParams(since, filter, timeout) - }).json() + const options = { searchParams: buildSearchParams(since, filter, timeout) } + if (signal) options.signal = signal + return this.client.get('v3/sync', options).json() } /** diff --git a/src/project.mjs b/src/project.mjs index ea72819..1d8c932 100644 --- a/src/project.mjs +++ b/src/project.mjs @@ -140,8 +140,23 @@ Project.prototype.joinLayer = async function (layerId) { const upstreamId = this.idMapping.get(layerId) || (Base64.isValid(layerId) ? Base64.decode(layerId) : layerId) + // 1. Add the upstream (Matrix) room ID to the filter BEFORE joining + // so the next sync poll includes it. + this.idMapping.remember(upstreamId, upstreamId) + this.pendingContent.add(upstreamId) + + // 2. Restart the sync long-poll so it picks up the updated rooms filter + // immediately. The restarted poll will be waiting when the join event + // arrives on the server. + this.timelineAPI.restartSync() + + // 3. NOW perform the actual join — the sync poll already includes this room. await this.structureAPI.join(upstreamId) const room = await this.structureAPI.getLayer(upstreamId) + + // 4. Replace the temporary self-mapping with the real ODIN↔Matrix mapping. + // room.room_id === upstreamId, so pendingContent stays valid. + this.idMapping.forget(upstreamId) this.idMapping.remember(room.id, room.room_id) // Register encryption if applicable (needed before content can be decrypted) @@ -149,10 +164,6 @@ Project.prototype.joinLayer = async function (layerId) { await this.crypto.registerRoom(room.room_id) } - // Mark for sync-gated content fetch: content will be loaded once the room - // appears in a sync response, not immediately after join. - this.pendingContent.add(room.room_id) - const layer = {...room} layer.role = { self: room.powerlevel.self.name, diff --git a/src/timeline-api.mjs b/src/timeline-api.mjs index ccfab59..3ba0c01 100644 --- a/src/timeline-api.mjs +++ b/src/timeline-api.mjs @@ -97,7 +97,7 @@ TimelineAPI.prototype.content = async function (roomId, filter, _from) { } -TimelineAPI.prototype.syncTimeline = async function(since, filter, timeout = 0) { +TimelineAPI.prototype.syncTimeline = async function(since, filter, timeout = 0, signal) { /* We want the complete timeline for all rooms that we have already joined. Thus we get the most recent events and then iterate over partial results until we filled the gap. The order of the events shall be @@ -117,7 +117,7 @@ TimelineAPI.prototype.syncTimeline = async function(since, filter, timeout = 0) // for catching up const jobs = {} - const syncResult = await this.httpApi.sync(since, effectiveFilter, timeout) + const syncResult = await this.httpApi.sync(since, effectiveFilter, timeout, signal) // Feed crypto state from sync response if (this.onSyncResponse) { @@ -263,26 +263,55 @@ TimelineAPI.prototype.catchUp = async function (roomId, lastKnownStreamToken, cu } -TimelineAPI.prototype.stream = async function* (since, filterProvider, signal = (new AbortController()).signal) { - +/** + * Abort the current long-poll sync request so that the stream restarts + * immediately with an updated filter (e.g. after joinLayer added a room + * to idMapping). The stream loop catches the abort and re-enters the + * next iteration without incrementing the retry counter. + */ +TimelineAPI.prototype.restartSync = function () { + if (this._syncAbort) { + this._syncAbort.abort() + this._syncAbort = null + } +} +TimelineAPI.prototype.stream = async function* (since, filterProvider, signal = (new AbortController()).signal) { let streamToken = since let retryCounter = 0 while (!signal.aborted) { + // Each iteration gets its own AbortController so that restartSync() + // can cancel the current long-poll without stopping the stream. + const iterationAbort = new AbortController() + this._syncAbort = iterationAbort + + // Forward the outer lifecycle signal: if the stream is stopped, + // also abort the current request. + const onOuterAbort = () => iterationAbort.abort() + signal.addEventListener('abort', onOuterAbort, { once: true }) + try { await chill(retryCounter) const filter = filterProvider ? filterProvider() : undefined - const syncResult = await this.syncTimeline(streamToken, filter, DEFAULT_POLL_TIMEOUT, signal) + const syncResult = await this.syncTimeline(streamToken, filter, DEFAULT_POLL_TIMEOUT, iterationAbort.signal) retryCounter = 0 if (streamToken !== syncResult.next_batch) { streamToken = syncResult.next_batch yield syncResult } } catch (error) { + if (iterationAbort.signal.aborted && !signal.aborted) { + // restartSync() was called — not an error, just restart immediately + getLogger().debug('Sync restarted (filter update)') + continue + } retryCounter++ yield new Error(error) + } finally { + signal.removeEventListener('abort', onOuterAbort) + this._syncAbort = null } } } diff --git a/test-e2e/project-join-content.test.mjs b/test-e2e/project-join-content.test.mjs index f11d743..57c05c9 100644 --- a/test-e2e/project-join-content.test.mjs +++ b/test-e2e/project-join-content.test.mjs @@ -174,4 +174,91 @@ describe('Project Join Content (E2E)', function () { `Operation at index ${i} out of order`) } }) + + it('sync-gated received() delivers content after joinLayer() restarts sync', async function () { + // Fresh users for a clean sync state + const alice2Creds = await registerUser(`alice2_pjc_${suffix}`) + const bob2Creds = await registerUser(`bob2_pjc_${suffix}`) + const alice2Client = MatrixClient({ ...alice2Creds, db: createDB() }) + const bob2Client = MatrixClient({ ...bob2Creds, db: createDB() }) + + const projectId2 = `project2-${suffix}` + const layerId2 = `layer2-${suffix}` + + // Alice: create project, layer, post content, invite Bob + const alice2ProjectList = await alice2Client.projectList(alice2Creds) + const shared = await alice2ProjectList.share(projectId2, 'Sync-Gate Test', 'E2E') + + const alice2Project = await alice2Client.project(alice2Creds) + await alice2Project.hydrate({ id: projectId2, upstreamId: shared.upstreamId }) + await alice2Project.shareLayer(layerId2, 'Test Layer', '') + await alice2Project.post(layerId2, testOps) + await waitForQueueDrain(alice2Project) + + await alice2ProjectList.invite(projectId2, bob2Creds.user_id) + + // Bob: receive invitation + const bob2ProjectList = await bob2Client.projectList(bob2Creds) + + const invitation = await new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('No invite received')), 15000) + bob2ProjectList.start(null, { + error: async (err) => { clearTimeout(timer); reject(err) }, + invited: async (project) => { + clearTimeout(timer) + await bob2ProjectList.stop() + resolve(project) + } + }) + }) + + // Bob: join project, hydrate + await bob2ProjectList.join(invitation.id) + const bob2Project = await bob2Client.project(bob2Creds) + const structure = await bob2Project.hydrate({ + id: invitation.id, + upstreamId: bob2ProjectList.wellKnown.get(invitation.id) + }) + assert.ok(structure.invitations.length > 0) + + // Bob: start project stream FIRST, then join layer + // joinLayer() should restart the sync so the room appears immediately + const receivedOps = [] + let receivedResolve + const receivedPromise = new Promise(resolve => { receivedResolve = resolve }) + const receivedTimeout = setTimeout(() => receivedResolve(), 15000) + + bob2Project.start(undefined, { + received: async ({ id, operations }) => { + receivedOps.push(...operations) + if (receivedOps.length >= testOps.length) { + clearTimeout(receivedTimeout) + receivedResolve() + } + }, + error: async (err) => console.error('Stream error:', err) + }) + + // Give stream a moment to start its first long-poll + await new Promise(resolve => setTimeout(resolve, 1000)) + + // Bob joins the layer — this should restart the sync + const inv = structure.invitations[0] + await bob2Project.joinLayer(inv.id) + + // Wait for content via received() + await receivedPromise + await bob2Project.stop() + await alice2Project.commandAPI.stop() + + console.log(` Sync-gated received() delivered ${receivedOps.length} operations`) + + assert.equal(receivedOps.length, testOps.length, + `Expected ${testOps.length} operations via received(), got ${receivedOps.length}`) + + for (let i = 0; i < testOps.length; i++) { + assert.deepStrictEqual(receivedOps[i], testOps[i], + `Operation at index ${i} out of order`) + } + }) }) diff --git a/test/sync-gated-content.test.mjs b/test/sync-gated-content.test.mjs index 728586f..af3f784 100644 --- a/test/sync-gated-content.test.mjs +++ b/test/sync-gated-content.test.mjs @@ -50,6 +50,8 @@ const createTimelineAPI = ({ syncChunks = [], contentResult = { events: [] }, cr } }, + restartSync: () => {}, + // For assertions _contentCallCount: 0, _lastContentFilter: null,