Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.time.Instant
* or [toFailed] — each returning a new immutable copy.
*/
data class OutboxEntry(
@get:JvmName("getOutboxId")
val outboxId: OutboxId,
val messageType: String,
val payload: String,
Expand Down Expand Up @@ -47,6 +48,7 @@ data class OutboxEntry(

companion object {
/** Creates a new PENDING entry from a [message] and [deliveryInfo]. */
@JvmStatic
fun createPending(message: OutboxMessage, deliveryInfo: DeliveryInfo, now: Instant): OutboxEntry =
createPending(message, deliveryType = deliveryInfo.type, deliveryMetadata = deliveryInfo.serialize(), now = now)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class OutboxProcessor(
private val store: OutboxStore,
private val entryProcessor: OutboxEntryProcessor,
) {
@JvmOverloads
fun processNext(limit: Int = 10) {
store.claimPending(limit).forEach { entry ->
val updated = entryProcessor.process(entry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import java.time.Clock
* Framework-specific modules (e.g. `okapi-spring`) may provide wrappers that
* validate transactional context before delegating here.
*/
class OutboxPublisher(
class OutboxPublisher @JvmOverloads constructor(
private val outboxStore: OutboxStore,
private val clock: Clock = Clock.systemUTC(),
) {
@JvmName("publish")
fun publish(outboxMessage: OutboxMessage, deliveryInfo: DeliveryInfo): OutboxId = OutboxEntry
.createPending(outboxMessage, deliveryInfo, clock.instant())
.also(outboxStore::persist)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean
*
* Delegates to [OutboxStore.removeDeliveredBefore] -- works with any storage adapter.
*/
class OutboxPurger(
class OutboxPurger @JvmOverloads constructor(
private val outboxStore: OutboxStore,
private val config: OutboxPurgerConfig = OutboxPurgerConfig(),
private val clock: Clock = Clock.systemUTC(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* - `okapi-spring-boot`: `SmartLifecycle`
* - `okapi-ktor`: `ApplicationStarted` / `ApplicationStopped`
*/
class OutboxScheduler(
class OutboxScheduler @JvmOverloads constructor(
private val outboxProcessor: OutboxProcessor,
private val transactionRunner: TransactionRunner? = null,
private val config: OutboxSchedulerConfig = OutboxSchedulerConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ enum class OutboxStatus {

companion object {
/** Resolves a status by matching the given [value] against enum entry names. Throws if unknown. */
@JvmStatic
fun from(value: String): OutboxStatus = requireNotNull(entries.find { it.name == value }) {
"Unknown outbox status: $value"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TransactionalOutboxPublisher(
private val delegate: OutboxPublisher,
private val validator: TransactionContextValidator,
) {
@JvmName("publish")
fun publish(outboxMessage: OutboxMessage, deliveryInfo: DeliveryInfo): OutboxId {
check(validator.isInActiveReadWriteTransaction()) { validator.failureMessage }
return delegate.publish(outboxMessage, deliveryInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.softwaremill.okapi.core.DeliveryInfo
* [serviceName] is resolved to a base URL via [ServiceUrlResolver] at delivery time.
* [endpointPath] is appended to form the full URL.
*/
data class HttpDeliveryInfo(
data class HttpDeliveryInfo @JvmOverloads constructor(
override val type: String = TYPE,
val serviceName: String,
val endpointPath: String,
Expand All @@ -29,6 +29,7 @@ data class HttpDeliveryInfo(
private val mapper = jacksonObjectMapper()

/** Deserializes from JSON stored in [OutboxEntry.deliveryMetadata]. */
@JvmStatic
fun deserialize(json: String): HttpDeliveryInfo = mapper.readValue(json)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.time.Duration
*
* Connection errors are treated as retriable.
*/
class HttpMessageDeliverer(
class HttpMessageDeliverer @JvmOverloads constructor(
private val urlResolver: ServiceUrlResolver,
private val httpClient: HttpClient = defaultHttpClient(),
private val retriableStatusCodes: Set<Int> = DEFAULT_RETRIABLE_STATUS_CODES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.softwaremill.okapi.core.DeliveryInfo
* [topic] is required. Optional [partitionKey] controls partition routing.
* Custom [headers] are sent as UTF-8 encoded Kafka record headers.
*/
data class KafkaDeliveryInfo(
data class KafkaDeliveryInfo @JvmOverloads constructor(
override val type: String = TYPE,
val topic: String,
val partitionKey: String? = null,
Expand All @@ -27,6 +27,7 @@ data class KafkaDeliveryInfo(
private val mapper = jacksonObjectMapper()

/** Deserializes from JSON stored in [OutboxEntry.deliveryMetadata]. */
@JvmStatic
fun deserialize(json: String): KafkaDeliveryInfo = mapper.readValue(json)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class SpringOutboxPublisher(delegate: OutboxPublisher, dataSource: DataSource) {
*
* @throws IllegalStateException if no active read-write transaction is present.
*/
@JvmName("publish")
fun publish(outboxMessage: OutboxMessage, deliveryInfo: DeliveryInfo): OutboxId =
transactionalPublisher.publish(outboxMessage, deliveryInfo)
}