Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/features/sync/lib/Sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,23 +268,20 @@ export class SyncService extends AuthenticatedDropboxService {
private async uploadFileInAssembly(dbxPath: string, uploadUrl: string, copilotApi: CopilotAPI) {
logger.info('SyncService#uploadFileInAssembly :: Uploading file to Assembly', dbxPath)

const dbx = this.dbxClient.getDropboxClient()
const fileMetaData = await dbx.filesDownload({ path: dbxPath }) // get metadata for the files
logger.info('SyncService#uploadFileInAssembly :: File metadata downloaded', dbxPath)

const downloadBody = await this.dbxClient.downloadFile({
// Stream the file directly from Dropbox into Assembly's S3 upload URL.
// `contentLength` comes from the download response headers, guaranteeing it
// matches the exact bytes in the stream. Avoids the Dropbox SDK's
// `filesDownload` which buffers the full file in memory (OOMs on videos).
const { body: downloadBody, contentLength } = await this.dbxClient.downloadFile({
urlPath: DBX_URL_PATH.fileDownload,
filePath: dbxPath,
rootNamespaceId: z.string().parse(this.connectionToken.rootNamespaceId),
refreshToken: this.connectionToken.refreshToken,
})
logger.info('SyncService#uploadFileInAssembly :: Found downloadBody', Boolean(downloadBody))

// upload file to assembly
const fileUploadResp = await copilotApi.uploadFile(
uploadUrl,
fileMetaData.result.size.toString(),
downloadBody,
)
const fileUploadResp = await copilotApi.uploadFile(uploadUrl, contentLength, downloadBody)
logger.info('SyncService#uploadFileInAssembly :: File uploaded to Assembly', dbxPath)

if (fileUploadResp.status !== httpStatus.OK) {
Expand Down Expand Up @@ -430,6 +427,7 @@ export class SyncService extends AuthenticatedDropboxService {
filePath: path,
body: resp.body,
rootNamespaceId: z.string().parse(this.connectionToken.rootNamespaceId),
refreshToken: this.connectionToken.refreshToken,
})
logger.info('SyncService#uploadFileInDropbox :: File uploaded to', path)
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ const processFailedSync = async (
if (file && file.status !== 'pending') return

const fileInDropbox = await getFileFromDropbox(dbxClient, failedSync.dbxFileId ?? '')
if (!fileInDropbox) return
// Guard against non-file entries: a mapped dbxFileId could resolve to a folder
// or deleted entry after a Dropbox rename/delete-and-recreate. Only resync files.
if (!fileInDropbox || fileInDropbox['.tag'] !== 'file') return

const channelSync = await db.query.channelSync.findFirst({
where: (channelSync, { eq }) => eq(channelSync.id, failedSync.channelSyncId),
Expand Down
31 changes: 29 additions & 2 deletions src/lib/dropbox/DropboxClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,18 @@ export class DropboxClient {
urlPath,
filePath,
rootNamespaceId,
refreshToken,
}: {
urlPath: string
filePath: string
rootNamespaceId: string
}) {
refreshToken: string
}): Promise<{ body: NodeJS.ReadableStream | null; contentLength: string }> {
// Ensure a valid access token before the download. The DropboxAuth instance
// is created per-task with only the refresh token set; without an explicit
// refresh here, `getAccessToken()` returns undefined and Dropbox 400s.
await this.dbxAuthClient.refreshAccessToken(refreshToken)

const headers = {
Authorization: `Bearer ${this.dbxAuthClient.authInstance.getAccessToken()}`,
'Dropbox-API-Path-Root': dropboxArgHeader({
Expand All @@ -126,7 +133,19 @@ export class DropboxClient {
throw new DropboxResponseError(response.status, response.headers, {
error_summary: 'DropboxClient#downloadFile. Failed to download file', // following the dropbox response error convention with snake case
})
return response.body

// Use the Content-Length from the download response as the upload size.
// This is the exact byte count about to be streamed, guaranteed to match
// what the upstream (S3) sees — unlike the size in `filesListFolder` entries,
// which can diverge from the downloaded stream for certain file types.
const contentLength = response.headers.get('content-length')
if (!contentLength) {
throw new Error(
`DropboxClient#downloadFile. Missing Content-Length header for file: ${filePath}`,
)
Comment on lines +143 to +145
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandipBajracharya Do we actually expect this to be called or is it there just for type safety? And if this can be a legit cause, is it critical enough to not sync the file? Could you provide more context on what happens when we actually get this error?

Will it be marked as failed? retried again??

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we get this error, it is retried 3 times by syncDropboxFilesToAssembly task before it completely fails. On failure we get reported via Sentry and we've also set up slack alerts. The failure of single task does not impact the overall sync. We have a trigger.dev task: resyncFailedFilesInAssembly that tries to resync those failed records.

}

return { body: response.body, contentLength }
}

/**
Expand All @@ -138,12 +157,20 @@ export class DropboxClient {
filePath,
body,
rootNamespaceId,
refreshToken,
}: {
urlPath: string
filePath: string
body: NodeJS.ReadableStream | null
rootNamespaceId: string
refreshToken: string
}): Promise<DropboxFileMetadata> {
// Explicit token refresh — mirrors `_downloadFile`. Previously relied on
// an implicit refresh from a preceding SDK call (e.g. filesGetMetadata),
// which is a fragile contract — any future caller reaching this method
// without a prior SDK call would 400 on an unpopulated Bearer.
await this.dbxAuthClient.refreshAccessToken(refreshToken)

const args = {
path: filePath,
autorename: false,
Expand Down
Loading