Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/http-api.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ HttpAPI.prototype.sendToDevice = async function (eventType, txnId = randomUUID()
}).json()
}

HttpAPI.prototype.sync = async function (since, filter, timeout = POLL_TIMEOUT) {
HttpAPI.prototype.sync = async function (since, filter, timeout = POLL_TIMEOUT, signal) {
const buildSearchParams = (since, filter, timeout) => {
const params = {
timeout
Expand All @@ -380,9 +380,9 @@ HttpAPI.prototype.sync = async function (since, filter, timeout = POLL_TIMEOUT)
if (f) params.filter = f
return params
}
return this.client.get('v3/sync', {
searchParams: buildSearchParams(since, filter, timeout)
}).json()
const options = { searchParams: buildSearchParams(since, filter, timeout) }
if (signal) options.signal = signal
return this.client.get('v3/sync', options).json()
}

/**
Expand Down
19 changes: 15 additions & 4 deletions src/project.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,30 @@ Project.prototype.joinLayer = async function (layerId) {

const upstreamId = this.idMapping.get(layerId) || (Base64.isValid(layerId) ? Base64.decode(layerId) : layerId)

// 1. Add the upstream (Matrix) room ID to the filter BEFORE joining
// so the next sync poll includes it.
this.idMapping.remember(upstreamId, upstreamId)
this.pendingContent.add(upstreamId)

// 2. Restart the sync long-poll so it picks up the updated rooms filter
// immediately. The restarted poll will be waiting when the join event
// arrives on the server.
this.timelineAPI.restartSync()

// 3. NOW perform the actual join — the sync poll already includes this room.
await this.structureAPI.join(upstreamId)
const room = await this.structureAPI.getLayer(upstreamId)

// 4. Replace the temporary self-mapping with the real ODIN↔Matrix mapping.
// room.room_id === upstreamId, so pendingContent stays valid.
this.idMapping.forget(upstreamId)
this.idMapping.remember(room.id, room.room_id)

// Register encryption if applicable (needed before content can be decrypted)
if (this.crypto.isEnabled && room.encryption) {
await this.crypto.registerRoom(room.room_id)
}

// Mark for sync-gated content fetch: content will be loaded once the room
// appears in a sync response, not immediately after join.
this.pendingContent.add(room.room_id)

const layer = {...room}
layer.role = {
self: room.powerlevel.self.name,
Expand Down
39 changes: 34 additions & 5 deletions src/timeline-api.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ TimelineAPI.prototype.content = async function (roomId, filter, _from) {
}


TimelineAPI.prototype.syncTimeline = async function(since, filter, timeout = 0) {
TimelineAPI.prototype.syncTimeline = async function(since, filter, timeout = 0, signal) {
/*
We want the complete timeline for all rooms that we have already joined. Thus we get the most recent
events and then iterate over partial results until we filled the gap. The order of the events shall be
Expand All @@ -117,7 +117,7 @@ TimelineAPI.prototype.syncTimeline = async function(since, filter, timeout = 0)
// for catching up
const jobs = {}

const syncResult = await this.httpApi.sync(since, effectiveFilter, timeout)
const syncResult = await this.httpApi.sync(since, effectiveFilter, timeout, signal)

// Feed crypto state from sync response
if (this.onSyncResponse) {
Expand Down Expand Up @@ -263,26 +263,55 @@ TimelineAPI.prototype.catchUp = async function (roomId, lastKnownStreamToken, cu
}


TimelineAPI.prototype.stream = async function* (since, filterProvider, signal = (new AbortController()).signal) {

/**
* Abort the current long-poll sync request so that the stream restarts
* immediately with an updated filter (e.g. after joinLayer added a room
* to idMapping). The stream loop catches the abort and re-enters the
* next iteration without incrementing the retry counter.
*/
TimelineAPI.prototype.restartSync = function () {
if (this._syncAbort) {
this._syncAbort.abort()
this._syncAbort = null
}
}

TimelineAPI.prototype.stream = async function* (since, filterProvider, signal = (new AbortController()).signal) {

let streamToken = since
let retryCounter = 0

while (!signal.aborted) {
// Each iteration gets its own AbortController so that restartSync()
// can cancel the current long-poll without stopping the stream.
const iterationAbort = new AbortController()
this._syncAbort = iterationAbort

// Forward the outer lifecycle signal: if the stream is stopped,
// also abort the current request.
const onOuterAbort = () => iterationAbort.abort()
signal.addEventListener('abort', onOuterAbort, { once: true })

try {
await chill(retryCounter)
const filter = filterProvider ? filterProvider() : undefined
const syncResult = await this.syncTimeline(streamToken, filter, DEFAULT_POLL_TIMEOUT, signal)
const syncResult = await this.syncTimeline(streamToken, filter, DEFAULT_POLL_TIMEOUT, iterationAbort.signal)
retryCounter = 0
if (streamToken !== syncResult.next_batch) {
streamToken = syncResult.next_batch
yield syncResult
}
} catch (error) {
if (iterationAbort.signal.aborted && !signal.aborted) {
// restartSync() was called — not an error, just restart immediately
getLogger().debug('Sync restarted (filter update)')
continue
}
retryCounter++
yield new Error(error)
} finally {
signal.removeEventListener('abort', onOuterAbort)
this._syncAbort = null
}
}
}
Expand Down
87 changes: 87 additions & 0 deletions test-e2e/project-join-content.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,91 @@ describe('Project Join Content (E2E)', function () {
`Operation at index ${i} out of order`)
}
})

it('sync-gated received() delivers content after joinLayer() restarts sync', async function () {
// Fresh users for a clean sync state
const alice2Creds = await registerUser(`alice2_pjc_${suffix}`)
const bob2Creds = await registerUser(`bob2_pjc_${suffix}`)
const alice2Client = MatrixClient({ ...alice2Creds, db: createDB() })
const bob2Client = MatrixClient({ ...bob2Creds, db: createDB() })

const projectId2 = `project2-${suffix}`
const layerId2 = `layer2-${suffix}`

// Alice: create project, layer, post content, invite Bob
const alice2ProjectList = await alice2Client.projectList(alice2Creds)
const shared = await alice2ProjectList.share(projectId2, 'Sync-Gate Test', 'E2E')

const alice2Project = await alice2Client.project(alice2Creds)
await alice2Project.hydrate({ id: projectId2, upstreamId: shared.upstreamId })
await alice2Project.shareLayer(layerId2, 'Test Layer', '')
await alice2Project.post(layerId2, testOps)
await waitForQueueDrain(alice2Project)

await alice2ProjectList.invite(projectId2, bob2Creds.user_id)

// Bob: receive invitation
const bob2ProjectList = await bob2Client.projectList(bob2Creds)

const invitation = await new Promise((resolve, reject) => {
const timer = setTimeout(() => reject(new Error('No invite received')), 15000)
bob2ProjectList.start(null, {
error: async (err) => { clearTimeout(timer); reject(err) },
invited: async (project) => {
clearTimeout(timer)
await bob2ProjectList.stop()
resolve(project)
}
})
})

// Bob: join project, hydrate
await bob2ProjectList.join(invitation.id)
const bob2Project = await bob2Client.project(bob2Creds)
const structure = await bob2Project.hydrate({
id: invitation.id,
upstreamId: bob2ProjectList.wellKnown.get(invitation.id)
})
assert.ok(structure.invitations.length > 0)

// Bob: start project stream FIRST, then join layer
// joinLayer() should restart the sync so the room appears immediately
const receivedOps = []
let receivedResolve
const receivedPromise = new Promise(resolve => { receivedResolve = resolve })
const receivedTimeout = setTimeout(() => receivedResolve(), 15000)

bob2Project.start(undefined, {
received: async ({ id, operations }) => {
receivedOps.push(...operations)
if (receivedOps.length >= testOps.length) {
clearTimeout(receivedTimeout)
receivedResolve()
}
},
error: async (err) => console.error('Stream error:', err)
})

// Give stream a moment to start its first long-poll
await new Promise(resolve => setTimeout(resolve, 1000))

// Bob joins the layer — this should restart the sync
const inv = structure.invitations[0]
await bob2Project.joinLayer(inv.id)

// Wait for content via received()
await receivedPromise
await bob2Project.stop()
await alice2Project.commandAPI.stop()

console.log(` Sync-gated received() delivered ${receivedOps.length} operations`)

assert.equal(receivedOps.length, testOps.length,
`Expected ${testOps.length} operations via received(), got ${receivedOps.length}`)

for (let i = 0; i < testOps.length; i++) {
assert.deepStrictEqual(receivedOps[i], testOps[i],
`Operation at index ${i} out of order`)
}
})
})
2 changes: 2 additions & 0 deletions test/sync-gated-content.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const createTimelineAPI = ({ syncChunks = [], contentResult = { events: [] }, cr
}
},

restartSync: () => {},

// For assertions
_contentCallCount: 0,
_lastContentFilter: null,
Expand Down