Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private object DisconnectedApi : KetchApi {

override suspend fun resolve(
url: String,
headers: Map<String, String>,
properties: Map<String, String>,
): ResolvedSource {
throw IllegalStateException(
"No instance connected. Add a remote server first.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FakeKetchApi(

override suspend fun resolve(
url: String,
headers: Map<String, String>,
properties: Map<String, String>,
): ResolvedSource {
throw UnsupportedOperationException(
"FakeKetchApi does not support resolve"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ interface KetchApi {
* during [download].
*
* @param url the URL to resolve
* @param headers optional HTTP headers to include in the probe
* @param properties source-specific key-value pairs. For HTTP
* sources this contains HTTP headers; other sources may
* interpret them differently or ignore them.
*/
suspend fun resolve(
url: String,
headers: Map<String, String> = emptyMap(),
properties: Map<String, String> = emptyMap(),
): ResolvedSource

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ class Ketch(

override suspend fun resolve(
url: String,
headers: Map<String, String>,
properties: Map<String, String>,
): ResolvedSource {
log.i { "Resolving URL: $url" }
val source = sourceResolver.resolve(url)
return source.resolve(url, headers)
return source.resolve(url, properties)
}

override suspend fun start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import kotlinx.coroutines.flow.MutableStateFlow
* call this with the number of bytes before writing each chunk.
* This replaces direct [SpeedLimiter] access to avoid cross-module
* visibility issues with internal types.
* @property headers HTTP headers or source-specific metadata headers
* @property headers request headers from [DownloadRequest.headers].
* Used by HTTP sources for custom headers; other sources may ignore.
* @property preResolved pre-resolved URL metadata, allowing the
* download source to skip its own probe/HEAD request
* @property maxConnections observable override for the number of
* concurrent segment connections. When positive, takes precedence
* over [DownloadRequest.connections]. Emitting a new value triggers
* live resegmentation in [HttpDownloadSource]. Reduced automatically
* on HTTP 429 (Too Many Requests) responses.
* live resegmentation in sources that support it. Reduced
* automatically on HTTP 429 (Too Many Requests) responses.
* @property pendingResegment target connection count for a pending
* resegmentation. Set by the connection-change watcher before
* canceling the download batch scope. Read by [HttpDownloadSource]
* to distinguish resegment-cancel from external cancel.
* canceling the download batch scope. Read by sources to
* distinguish resegment-cancel from external cancel.
*/
class DownloadContext(
val taskId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ internal class DownloadExecution(
totalBytes = total

val fileName = resolvedUrl.suggestedFileName
?: fileNameResolver.resolve(
request, toServerInfo(resolvedUrl),
)
?: fileNameResolver.resolve(request, resolvedUrl)
val outputPath = resolveDestPath(
destination = request.destination,
defaultDir = config.defaultDirectory ?: "downloads",
Expand All @@ -162,30 +160,31 @@ internal class DownloadExecution(
outputPath = outputPath,
state = TaskState.DOWNLOADING,
totalBytes = total,
acceptRanges = resolvedUrl.supportsResume,
etag = resolvedUrl.metadata[HttpDownloadSource.META_ETAG],
lastModified = resolvedUrl.metadata[
HttpDownloadSource.META_LAST_MODIFIED,
],
sourceType = source.type,
sourceResumeState = source.buildResumeState(
resolvedUrl, total,
),
updatedAt = now,
)
}

taskLimiter.delegate = createLimiter(request.speedLimit)

val preResolved = if (resolved != null) resolvedUrl else null
runDownload(
outputPath, total, source.managesOwnFileIo, preResolved,
) { ctx ->
runDownload(outputPath, total, source, preResolved) { ctx ->
source.download(ctx)
}
}

private suspend fun executeResume(info: ResumeInfo) {
val taskRecord = info.record

val sourceType = taskRecord.sourceType ?: HttpDownloadSource.TYPE
val sourceType = taskRecord.sourceType
?: throw KetchError.Unknown(
IllegalStateException(
"No sourceType for taskId=${taskRecord.taskId}",
),
)
val source = sourceResolver.resolveByType(sourceType)
log.i {
"Resuming download for taskId=$taskId via " +
Expand All @@ -202,16 +201,11 @@ internal class DownloadExecution(
taskLimiter.delegate = createLimiter(taskRecord.request.speedLimit)

val resumeState = taskRecord.sourceResumeState
?: HttpDownloadSource.buildResumeState(
etag = taskRecord.etag,
lastModified = taskRecord.lastModified,
totalBytes = taskRecord.totalBytes,
?: throw KetchError.CorruptResumeState(
"No resume state for taskId=${taskRecord.taskId}",
)

runDownload(
outputPath, taskRecord.totalBytes, source.managesOwnFileIo,
) { ctx ->
context = ctx
runDownload(outputPath, taskRecord.totalBytes, source) { ctx ->
source.resume(ctx, resumeState)
}
}
Expand All @@ -221,16 +215,18 @@ internal class DownloadExecution(
* builds the [DownloadContext], runs [downloadBlock] with retry,
* flushes, persists completion, and cleans up.
*
* When [selfManagedIo] is `true`, the source handles its own file
* I/O so we use [NoOpFileAccessor] and skip flush/cleanup.
* When [DownloadSource.managesOwnFileIo] is `true`, the source
* handles its own file I/O so we use [NoOpFileAccessor] and skip
* flush/cleanup.
*/
private suspend fun runDownload(
outputPath: String,
total: Long,
selfManagedIo: Boolean = false,
source: DownloadSource,
preResolved: ResolvedSource? = null,
downloadBlock: suspend (DownloadContext) -> Unit,
) {
val selfManagedIo = source.managesOwnFileIo
val fa = if (selfManagedIo) {
NoOpFileAccessor
} else {
Expand All @@ -248,10 +244,12 @@ internal class DownloadExecution(
while (true) {
delay(config.saveIntervalMs)
val snapshot = handle.mutableSegments.value
val downloaded = snapshot.sumOf { it.downloadedBytes }
val updatedResume = source.updateResumeState(ctx)
handle.record.update {
it.copy(
segments = snapshot,
sourceResumeState = updatedResume
?: it.sourceResumeState,
updatedAt = Clock.System.now(),
)
}
Expand All @@ -278,11 +276,6 @@ internal class DownloadExecution(
it.copy(
state = TaskState.COMPLETED,
segments = null,
sourceResumeState = HttpDownloadSource.buildResumeState(
etag = it.etag,
lastModified = it.lastModified,
totalBytes = it.totalBytes,
),
updatedAt = Clock.System.now(),
)
}
Expand Down Expand Up @@ -479,17 +472,6 @@ internal class DownloadExecution(
}
}

private fun toServerInfo(resolved: ResolvedSource): ServerInfo {
return ServerInfo(
contentLength = resolved.totalBytes,
acceptRanges = resolved.supportsResume,
etag = resolved.metadata[HttpDownloadSource.META_ETAG],
lastModified = resolved.metadata[
HttpDownloadSource.META_LAST_MODIFIED,
],
)
}

private fun resolveDestPath(
destination: com.linroid.ketch.api.Destination?,
defaultDir: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import com.linroid.ketch.api.ResolvedSource
*
* Each source handles a specific protocol or download mechanism.
* The default implementation is [HttpDownloadSource] for HTTP/HTTPS
* downloads. Future implementations may include torrent, media
* extraction, or other protocols.
* downloads. Other implementations include FTP, BitTorrent, etc.
*
* Sources are registered with [SourceResolver] which routes
* download requests to the appropriate source based on URL matching.
Expand All @@ -32,10 +31,15 @@ interface DownloadSource {
* Resolves source metadata for the given URL without downloading.
* This is analogous to an HTTP HEAD request but generalized for
* any source type.
*
* @param url the URL to resolve
* @param properties source-specific key-value pairs. For HTTP
* sources this contains HTTP headers; other sources may
* interpret them differently or ignore them.
*/
suspend fun resolve(
url: String,
headers: Map<String, String> = emptyMap(),
properties: Map<String, String> = emptyMap(),
): ResolvedSource

/**
Expand All @@ -56,4 +60,36 @@ interface DownloadSource {
* [com.linroid.ketch.api.KetchError.Unsupported].
*/
suspend fun resume(context: DownloadContext, resumeState: SourceResumeState)

/**
* Builds an opaque [SourceResumeState] from resolved metadata.
*
* Called after [resolve] completes to persist source-specific
* state needed for resume validation (e.g., HTTP ETag/Last-Modified,
* FTP MDTM, torrent info hash). The returned state is stored in
* the task record and passed back to [resume] on restart.
*
* @param resolved the metadata returned by [resolve]
* @param totalBytes total download size in bytes
*/
fun buildResumeState(
resolved: ResolvedSource,
totalBytes: Long,
): SourceResumeState

/**
* Called periodically during download to let the source update
* its resume state. The returned state replaces the current
* resume state in the task record.
*
* Default implementation returns `null` (no update needed).
* Sources like BitTorrent override this to persist bitfield
* progress incrementally.
*
* @param context the active download context
* @return updated resume state, or `null` to keep the current one
*/
suspend fun updateResumeState(
context: DownloadContext,
): SourceResumeState? = null
}
Loading
Loading