Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ endif

$(BUILD)/%.o: c_src/%.c
@echo " CC $(notdir $@)"
$(CC) -c $(ERL_CFLAGS) $(CFLAGS) -o $@ $<
$(CC) -c $(ERL_CFLAGS) $(CFLAGS) -MMD -MP -o $@ $<

# Include dependency files for automatic header tracking
-include $(OBJ:.o=.d)

$(LIB_NAME): $(OBJ)
@echo " LD $(notdir $@)"
Expand All @@ -152,7 +155,7 @@ $(PREFIX) $(BUILD):
mkdir -p $@

clean:
$(RM) $(LIB_NAME) $(ARCHIVE_NAME) $(OBJ)
$(RM) $(LIB_NAME) $(ARCHIVE_NAME) $(OBJ) $(OBJ:.o=.d)

.PHONY: all clean

Expand Down
187 changes: 182 additions & 5 deletions c_src/sqlite3_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ typedef struct connection
ErlNifMutex* mutex;
ErlNifMutex* interrupt_mutex;
ErlNifPid update_hook_pid;

// Custom busy handler state
volatile int cancelled; // volatile for MSVC compat
int busy_timeout_ms;
ErlNifEnv* callback_env; // for enif_is_process_alive
ErlNifPid caller_pid;
} connection_t;

typedef struct statement
Expand Down Expand Up @@ -287,6 +293,81 @@ statement_release_lock(statement_t* statement)
connection_release_lock(statement->conn);
}

// ---------------------------------------------------------------------------
// Custom busy handler
//
// Replaces SQLite's default busy handler (which sleeps via sqlite3OsSleep and
// cannot be interrupted) with one that polls conn->cancelled between each
// sqlite3_sleep() call. cancel() sets the flag and calls sqlite3_interrupt()
// so disconnect() wakes within at most one sleep interval (~10ms).
// ---------------------------------------------------------------------------

static int
exqlite_busy_handler(void* arg, int count)
{
connection_t* conn = (connection_t*)arg;

if (conn->cancelled)
return 0;

// Check if the calling process is still alive
if (conn->callback_env != NULL &&
!enif_is_process_alive(conn->callback_env, &conn->caller_pid)) {
conn->cancelled = 1;
return 0;
}

if (conn->busy_timeout_ms <= 0)
return 0;

static const int delays[] = {1, 2, 5, 10, 15, 20, 25, 25, 25, 50, 50};
static const int ndelay = sizeof(delays) / sizeof(delays[0]);

int total_waited = 0;
for (int i = 0; i < count && i < ndelay; i++)
total_waited += delays[i];
if (count >= ndelay)
total_waited += (count - ndelay) * 50;

if (total_waited >= conn->busy_timeout_ms)
return 0;

int sleep_ms = (count < ndelay) ? delays[count] : 50;
int remaining = conn->busy_timeout_ms - total_waited;
if (sleep_ms > remaining)
sleep_ms = remaining;

sqlite3_sleep(sleep_ms);

return conn->cancelled ? 0 : 1;
}

// Progress handler: fires every N VDBE opcodes.
// Returns non-zero to interrupt execution when cancelled.
static int
exqlite_progress_handler(void* arg)
{
connection_t* conn = (connection_t*)arg;
return conn->cancelled ? 1 : 0;
}

// Stash the current env + caller pid before a db operation.
// Must be called while holding conn->mutex.
static inline void
connection_stash_caller(connection_t* conn, ErlNifEnv* env)
{
conn->callback_env = env;
enif_self(env, &conn->caller_pid);
conn->cancelled = 0;
}

// Clear the stashed caller after a db operation completes.
static inline void
connection_clear_caller(connection_t* conn)
{
conn->callback_env = NULL;
}

///
/// Opens a new SQLite database
///
Expand Down Expand Up @@ -329,23 +410,30 @@ exqlite_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
return make_error_tuple(env, am_failed_to_create_mutex);
}

sqlite3_busy_timeout(db, 2000);

conn = enif_alloc_resource(connection_type, sizeof(connection_t));
if (!conn) {
sqlite3_close_v2(db);
enif_mutex_destroy(mutex);
return make_error_tuple(env, am_out_of_memory);
}
conn->db = db;
conn->mutex = mutex;
conn->db = db;
conn->mutex = mutex;

// Initialize busy handler fields
conn->cancelled = 0;
conn->busy_timeout_ms = 2000; // default matches sqlite3_busy_timeout(db, 2000)
conn->callback_env = NULL;

conn->interrupt_mutex = enif_mutex_create("exqlite:interrupt");
if (conn->interrupt_mutex == NULL) {
// conn->db and conn->mutex are set; the destructor will clean them up.
enif_release_resource(conn);
return make_error_tuple(env, am_failed_to_create_mutex);
}

// Install our custom busy handler + progress handler
sqlite3_busy_handler(db, exqlite_busy_handler, conn);
sqlite3_progress_handler(db, 1000, exqlite_progress_handler, conn);

result = enif_make_resource(env, conn);
enif_release_resource(conn);

Expand Down Expand Up @@ -375,9 +463,11 @@ exqlite_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
// cases. Cases such as query timeout and connection pooling
// attempting to close the connection
connection_acquire_lock(conn);
connection_stash_caller(conn, env);

// DB is already closed, nothing to do here.
if (conn->db == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return am_ok;
}
Expand All @@ -386,6 +476,7 @@ exqlite_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
if (autocommit == 0) {
rc = sqlite3_exec(conn->db, "ROLLBACK;", NULL, NULL, NULL);
if (rc != SQLITE_OK) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_sqlite3_error_tuple(env, rc, conn->db);
}
Expand All @@ -411,6 +502,7 @@ exqlite_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
conn->db = NULL;
enif_mutex_unlock(conn->interrupt_mutex);

connection_clear_caller(conn);
connection_release_lock(conn);

return am_ok;
Expand Down Expand Up @@ -442,18 +534,22 @@ exqlite_execute(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

connection_acquire_lock(conn);
connection_stash_caller(conn, env);

if (conn->db == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_error_tuple(env, am_connection_closed);
}

rc = sqlite3_exec(conn->db, (char*)bin.data, NULL, NULL, NULL);
if (rc != SQLITE_OK) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_sqlite3_error_tuple(env, rc, conn->db);
}

connection_clear_caller(conn);
connection_release_lock(conn);

return am_ok;
Expand Down Expand Up @@ -525,7 +621,9 @@ exqlite_prepare(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])

// ensure connection is not getting closed by parallel thread
connection_acquire_lock(conn);
connection_stash_caller(conn, env);
if (conn->db == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
enif_release_resource(statement);
return make_error_tuple(env, am_connection_closed);
Expand All @@ -535,11 +633,13 @@ exqlite_prepare(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])

if (rc != SQLITE_OK) {
result = make_sqlite3_error_tuple(env, rc, conn->db);
connection_clear_caller(conn);
connection_release_lock(conn);
enif_release_resource(statement);
return result;
}

connection_clear_caller(conn);
connection_release_lock(conn);

result = enif_make_resource(env, statement);
Expand Down Expand Up @@ -808,8 +908,10 @@ exqlite_multi_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

connection_acquire_lock(conn);
connection_stash_caller(conn, env);

if (statement->statement == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_error_tuple(env, am_invalid_statement);
}
Expand All @@ -822,11 +924,13 @@ exqlite_multi_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
switch (rc) {
case SQLITE_BUSY:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return am_busy;

case SQLITE_DONE:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return enif_make_tuple2(env, am_done, rows);

Expand All @@ -837,11 +941,13 @@ exqlite_multi_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])

default:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return make_sqlite3_error_tuple(env, rc, conn->db);
}
}

connection_clear_caller(conn);
connection_release_lock(conn);

return enif_make_tuple2(env, am_rows, rows);
Expand Down Expand Up @@ -874,8 +980,10 @@ exqlite_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

connection_acquire_lock(conn);
connection_stash_caller(conn, env);

if (statement->statement == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_error_tuple(env, am_invalid_statement);
}
Expand All @@ -884,19 +992,23 @@ exqlite_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
switch (rc) {
case SQLITE_ROW:
result = enif_make_tuple2(env, am_row, make_row(env, statement->statement));
connection_clear_caller(conn);
connection_release_lock(conn);
return result;
case SQLITE_BUSY:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return am_busy;
case SQLITE_DONE:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return am_done;
default:
sqlite3_reset(statement->statement);
result = make_sqlite3_error_tuple(env, rc, conn->db);
connection_clear_caller(conn);
connection_release_lock(conn);
return result;
}
Expand Down Expand Up @@ -1178,6 +1290,10 @@ connection_type_destructor(ErlNifEnv* env, void* arg)

connection_t* conn = (connection_t*)arg;

// Signal cancel to wake any busy handler that might still be sleeping,
// so it returns and releases SQLite's db->mutex before we close.
conn->cancelled = 1;

if (conn->db) {
sqlite3_close_v2(conn->db);
conn->db = NULL;
Expand Down Expand Up @@ -1516,6 +1632,65 @@ exqlite_interrupt(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
return am_ok;
}

///
/// Set busy timeout without destroying the custom handler.
/// (PRAGMA busy_timeout calls sqlite3_busy_timeout() which replaces handlers)
///
ERL_NIF_TERM
exqlite_set_busy_timeout(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
assert(env);

connection_t* conn = NULL;
int timeout_ms;

if (argc != 2) {
return enif_make_badarg(env);
}

if (!enif_get_resource(env, argv[0], connection_type, (void**)&conn)) {
return make_error_tuple(env, am_invalid_connection);
}

if (!enif_get_int(env, argv[1], &timeout_ms)) {
return enif_make_badarg(env);
}

conn->busy_timeout_ms = timeout_ms;

return am_ok;
}

/// Cancel: wake busy handler + interrupt VDBE.
/// Superset of interrupt/1: sets cancelled flag + calls sqlite3_interrupt().
///
ERL_NIF_TERM
exqlite_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
assert(env);

connection_t* conn = NULL;

if (argc != 1) {
return enif_make_badarg(env);
}

if (!enif_get_resource(env, argv[0], connection_type, (void**)&conn)) {
return make_error_tuple(env, am_invalid_connection);
}

conn->cancelled = 1;

// Also interrupt VDBE execution (same as interrupt/1)
enif_mutex_lock(conn->interrupt_mutex);
if (conn->db != NULL) {
sqlite3_interrupt(conn->db);
}
enif_mutex_unlock(conn->interrupt_mutex);

return am_ok;
}

ERL_NIF_TERM
exqlite_errmsg(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
Expand Down Expand Up @@ -1592,6 +1767,8 @@ static ErlNifFunc nif_funcs[] = {
{"set_update_hook", 2, exqlite_set_update_hook, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"set_log_hook", 1, exqlite_set_log_hook, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"interrupt", 1, exqlite_interrupt, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"set_busy_timeout", 2, exqlite_set_busy_timeout, 0},
{"cancel", 1, exqlite_cancel, 0},
{"errmsg", 1, exqlite_errmsg},
{"errstr", 1, exqlite_errstr},
};
Expand Down
Loading
Loading