diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 9f37c0a0bd5..1ff5f401706 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -166,6 +166,8 @@ void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins); void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data); ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic); ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); +int flb_input_chunk_release_route(struct flb_input_chunk *ic, + struct flb_output_instance *o_ins); size_t flb_input_chunk_set_limits(struct flb_input_instance *in); size_t flb_input_chunk_total_size(struct flb_input_instance *in); struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, diff --git a/src/flb_engine.c b/src/flb_engine.c index 5252a506d92..63178da843c 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -433,6 +434,8 @@ static inline int handle_output_event(uint64_t ts, flb_output_name(ins), out_id); } + flb_input_chunk_release_route(task->ic, ins); + cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), 1, (char *[]) {out_name}); diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 0a5f83ea7dc..fd93b159b29 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -56,6 +56,12 @@ #define FLB_INPUT_CHUNK_RAW_LOG_ROUTING (1 << 0) +/* + * chunkio file header overhead: + * 2 header bytes + 4 CRC32 + 16 padding + 2 metadata length bytes. + */ +#define FLB_INPUT_CHUNK_FILE_HEADER_SIZE 24 + struct input_chunk_raw { struct flb_input_instance *ins; int event_type; @@ -76,6 +82,27 @@ struct flb_input_chunk_meta_view { uint16_t routing_data_length; }; +/* + * Mirror the leading layout we need from chunkio's private cio_file backend. + * + * We intentionally avoid including here because that + * header pulls additional private includes that are not available through this + * translation unit's normal include path in this build. We only need the + * stable leading allocation fields to estimate alloc_size/realloc_size growth + * for storage.total_limit_size decisions. + */ +struct flb_input_chunk_cio_file_view { + int fd; + int flags; + int synced; + int allocate_strategy; + size_t fs_size; + size_t data_size; + size_t page_size; + size_t alloc_size; + size_t realloc_size; +}; + static inline int input_chunk_has_magic_bytes(char *buf, int len) { unsigned char *p; @@ -252,6 +279,19 @@ static int flb_input_chunk_drop_task_route( ssize_t *dropped_record_count, ssize_t *dropped_byte_count); +static int flb_input_chunk_has_other_routes( + struct flb_input_chunk *ic, + struct flb_output_instance *o_ins); + +static int flb_input_chunk_prefers_physical_delete( + struct flb_input_chunk *ic, + struct flb_output_instance *o_ins, + int release_scope); + +static size_t flb_input_chunk_get_projected_write_size( + struct flb_input_chunk *ic, + size_t append_size); + static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk) { @@ -304,6 +344,8 @@ static int flb_input_chunk_release_space( ssize_t *required_space, int release_scope) { + int pass; + int pass_limit; struct mk_list *input_chunk_iterator_tmp; struct mk_list *input_chunk_iterator; struct flb_router *router; @@ -317,122 +359,145 @@ static int flb_input_chunk_release_space( released_space = 0; - mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp, - &input_plugin->chunks) { - old_input_chunk = mk_list_entry(input_chunk_iterator, - struct flb_input_chunk, _head); + pass_limit = 1; + if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { + pass_limit = 2; + } - if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, - output_plugin->id, - input_plugin->config->router)) { - continue; - } + for (pass = 0; pass < pass_limit; pass++) { + mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp, + &input_plugin->chunks) { + old_input_chunk = mk_list_entry(input_chunk_iterator, + struct flb_input_chunk, _head); - if (flb_input_chunk_safe_delete(new_input_chunk, - old_input_chunk, - output_plugin->id) == FLB_FALSE) { - continue; - } + if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, + output_plugin->id, + input_plugin->config->router)) { + continue; + } - if (flb_input_chunk_drop_task_route(old_input_chunk->task, - output_plugin, - &dropped_record_count, - &dropped_byte_count) == FLB_FALSE) { - continue; - } + if (flb_input_chunk_safe_delete(new_input_chunk, + old_input_chunk, + output_plugin->id) == FLB_FALSE) { + continue; + } - chunk_size = flb_input_chunk_get_real_size(old_input_chunk); - chunk_released = FLB_FALSE; - chunk_destroy_flag = FLB_FALSE; + if (pass == 0 && + flb_input_chunk_prefers_physical_delete(old_input_chunk, + output_plugin, + release_scope) == FLB_FALSE) { + continue; + } - if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { - flb_routes_mask_clear_bit(old_input_chunk->routes_mask, - output_plugin->id, - input_plugin->config->router); + chunk_size = flb_input_chunk_get_real_size(old_input_chunk); + if (chunk_size < 0) { + flb_debug("[input chunk] could not retrieve chunk real size"); + continue; + } - FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); - output_plugin->fs_chunks_size -= chunk_size; + chunk_released = FLB_FALSE; + chunk_destroy_flag = FLB_FALSE; - chunk_destroy_flag = flb_routes_mask_is_empty( - old_input_chunk->routes_mask, - input_plugin->config->router); + if (flb_input_chunk_drop_task_route(old_input_chunk->task, + output_plugin, + &dropped_record_count, + &dropped_byte_count) == FLB_FALSE) { + continue; + } - chunk_released = FLB_TRUE; - } - else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { - chunk_destroy_flag = FLB_TRUE; - } + if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { + flb_routes_mask_clear_bit(old_input_chunk->routes_mask, + output_plugin->id, + input_plugin->config->router); -#ifdef FLB_HAVE_METRICS - if (dropped_record_count < 0) { - dropped_record_count = get_input_chunk_record_count(old_input_chunk); - } + FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); + output_plugin->fs_chunks_size -= chunk_size; - if (dropped_byte_count < 0) { - dropped_byte_count = chunk_size; - } + chunk_destroy_flag = flb_routes_mask_is_empty( + old_input_chunk->routes_mask, + input_plugin->config->router); - if (dropped_record_count == -1) { - flb_debug("[task] error getting chunk record count : %s", - old_input_chunk->in->name); - } - else if (dropped_record_count > 0) { - cmt_counter_add(output_plugin->cmt_dropped_records, - cfl_time_now(), - dropped_record_count, - 1, (char *[]) {(char *) flb_output_name(output_plugin)}); + chunk_released = FLB_TRUE; + } + else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { + chunk_destroy_flag = FLB_TRUE; + } - if (input_plugin->config && input_plugin->config->router && - old_input_chunk->event_type == FLB_INPUT_LOGS) { - router = input_plugin->config->router; +#ifdef FLB_HAVE_METRICS + if (dropped_record_count < 0) { + dropped_record_count = get_input_chunk_record_count(old_input_chunk); + } - cmt_counter_add(router->logs_drop_records_total, - cfl_time_now(), - (double) dropped_record_count, - 2, - (char *[]){(char *) flb_input_name(old_input_chunk->in), - (char *) flb_output_name(output_plugin)}); + if (dropped_byte_count < 0) { + dropped_byte_count = chunk_size; + } - cmt_counter_add(router->logs_drop_bytes_total, - cfl_time_now(), - (double) dropped_byte_count, - 2, - (char *[]){(char *) flb_input_name(old_input_chunk->in), - (char *) flb_output_name(output_plugin)}); + if (dropped_record_count == -1) { + flb_debug("[task] error getting chunk record count : %s", + old_input_chunk->in->name); } + else if (dropped_record_count > 0) { + cmt_counter_add(output_plugin->cmt_dropped_records, + cfl_time_now(), + dropped_record_count, + 1, (char *[]) {(char *) flb_output_name(output_plugin)}); + + if (input_plugin->config && input_plugin->config->router && + old_input_chunk->event_type == FLB_INPUT_LOGS) { + router = input_plugin->config->router; + + cmt_counter_add(router->logs_drop_records_total, + cfl_time_now(), + (double) dropped_record_count, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + + cmt_counter_add(router->logs_drop_bytes_total, + cfl_time_now(), + (double) dropped_byte_count, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + } - flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, - dropped_record_count, - output_plugin->metrics); - } + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, + dropped_record_count, + output_plugin->metrics); + } #endif - if (chunk_destroy_flag) { - if (old_input_chunk->task != NULL) { - /* - * If the chunk is referenced by a task and task has no active route, - * we need to destroy the task as well. - */ - if (old_input_chunk->task->users == 0) { - flb_debug("[task] drop task_id %d with no active route from input plugin %s", - old_input_chunk->task->id, new_input_chunk->in->name); - flb_task_destroy(old_input_chunk->task, FLB_TRUE); + if (chunk_destroy_flag) { + if (old_input_chunk->task != NULL) { + /* + * If the chunk is referenced by a task and task has no active route, + * we need to destroy the task as well. + */ + if (old_input_chunk->task->users == 0) { + flb_debug("[task] drop task_id %d with no active route from input plugin %s", + old_input_chunk->task->id, new_input_chunk->in->name); + flb_task_destroy(old_input_chunk->task, FLB_TRUE); + + chunk_released = FLB_TRUE; + } + } + else { + flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s", + flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name); + + flb_input_chunk_destroy(old_input_chunk, FLB_TRUE); chunk_released = FLB_TRUE; } } - else { - flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s", - flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name); - - flb_input_chunk_destroy(old_input_chunk, FLB_TRUE); - chunk_released = FLB_TRUE; + if (chunk_released) { + released_space += chunk_size; } - } - if (chunk_released) { - released_space += chunk_size; + if (released_space >= *required_space) { + break; + } } if (released_space >= *required_space) { @@ -489,12 +554,7 @@ ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) } meta_size = cio_meta_size(ic->chunk); - size += meta_size - /* See https://github.com/edsiper/chunkio#file-layout for more details */ - + 2 /* HEADER BYTES */ - + 4 /* CRC32 */ - + 16 /* PADDING */ - + 2; /* METADATA LENGTH BYTES */ + size += meta_size + FLB_INPUT_CHUNK_FILE_HEADER_SIZE; return size; } @@ -522,6 +582,39 @@ int flb_input_chunk_write_at(void *data, off_t offset, return ret; } +int flb_input_chunk_release_route(struct flb_input_chunk *ic, + struct flb_output_instance *o_ins) +{ + ssize_t bytes; + + if (ic == NULL || o_ins == NULL || ic->routes_mask == NULL) { + return -1; + } + + if (!flb_routes_mask_get_bit(ic->routes_mask, + o_ins->id, + o_ins->config->router)) { + return 0; + } + + if (o_ins->total_limit_size != -1 && ic->fs_counted == FLB_TRUE) { + bytes = flb_input_chunk_get_real_size(ic); + if (bytes < 0) { + flb_debug("[input chunk] could not retrieve chunk real size"); + return -1; + } + + FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, bytes); + o_ins->fs_chunks_size -= bytes; + } + + flb_routes_mask_clear_bit(ic->routes_mask, + o_ins->id, + o_ins->config->router); + + return 0; +} + static int flb_input_chunk_drop_task_route( struct flb_task *task, struct flb_output_instance *output_plugin, @@ -615,9 +708,169 @@ static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, return FLB_TRUE; } +static int flb_input_chunk_has_other_routes(struct flb_input_chunk *ic, + struct flb_output_instance *o_ins) +{ + struct mk_list *head; + struct flb_output_instance *candidate; + + mk_list_foreach(head, &ic->in->config->outputs) { + candidate = mk_list_entry(head, struct flb_output_instance, _head); + + if (candidate == o_ins) { + continue; + } + + if (flb_routes_mask_get_bit(ic->routes_mask, + candidate->id, + ic->in->config->router) != 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int flb_input_chunk_prefers_physical_delete( + struct flb_input_chunk *ic, + struct flb_output_instance *o_ins, + int release_scope) +{ + if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { + if (ic->task == NULL) { + return FLB_TRUE; + } + + return flb_input_chunk_is_task_safe_delete(ic->task); + } + + if (flb_input_chunk_has_other_routes(ic, o_ins) == FLB_TRUE) { + return FLB_FALSE; + } + + if (ic->task == NULL) { + return FLB_TRUE; + } + + return flb_input_chunk_is_task_safe_delete(ic->task); +} + +static size_t flb_input_chunk_get_projected_write_size( + struct flb_input_chunk *ic, + size_t append_size) +{ + size_t increment_size; + size_t increments; + size_t needed_size; + size_t page_size; + size_t meta_size; + size_t content_size; + size_t logical_size; + size_t alloc_size; + size_t current_size; + size_t realloc_size; + size_t projected_size; + size_t target_size; + ssize_t real_size; + ssize_t meta_size_value; + ssize_t content_size_value; + struct cio_chunk *chunk; + struct flb_input_chunk_cio_file_view *chunk_file; + + page_size = 4096; + chunk = (struct cio_chunk *) ic->chunk; + chunk_file = NULL; + + if (chunk != NULL && chunk->ctx != NULL && chunk->ctx->page_size > 0) { + page_size = (size_t) chunk->ctx->page_size; + } + + if (chunk != NULL && chunk->backend != NULL && + chunk->st != NULL && chunk->st->type == CIO_STORE_FS) { + chunk_file = (struct flb_input_chunk_cio_file_view *) chunk->backend; + } + + meta_size_value = cio_meta_size(ic->chunk); + content_size_value = flb_input_chunk_get_size(ic); + + if (meta_size_value < 0 || content_size_value < 0) { + return SIZE_MAX; + } + + meta_size = (size_t) meta_size_value; + content_size = (size_t) content_size_value; + + if (meta_size > SIZE_MAX - FLB_INPUT_CHUNK_FILE_HEADER_SIZE || + content_size > SIZE_MAX - meta_size - FLB_INPUT_CHUNK_FILE_HEADER_SIZE) { + return SIZE_MAX; + } + + logical_size = content_size + meta_size + FLB_INPUT_CHUNK_FILE_HEADER_SIZE; + if (logical_size > SIZE_MAX - append_size) { + return SIZE_MAX; + } + target_size = logical_size + append_size; + projected_size = target_size; + + alloc_size = 0; + realloc_size = page_size; + + if (chunk_file != NULL) { + alloc_size = chunk_file->alloc_size; + if (chunk_file->realloc_size > 0) { + realloc_size = chunk_file->realloc_size; + } + } + + if (alloc_size == 0) { + alloc_size = page_size; + } + + if (target_size > alloc_size) { + projected_size = alloc_size; + needed_size = target_size - projected_size; + + increments = needed_size / realloc_size; + if ((needed_size % realloc_size) != 0) { + increments++; + } + + if (increments > (SIZE_MAX - projected_size) / realloc_size) { + return SIZE_MAX; + } + + increment_size = increments * realloc_size; + projected_size += increment_size; + + if (projected_size > SIZE_MAX - (page_size - 1)) { + return SIZE_MAX; + } + + projected_size = ((projected_size + page_size - 1) / page_size) * page_size; + } + else { + projected_size = alloc_size; + } + + current_size = 0; + if (ic->fs_counted == FLB_TRUE) { + real_size = flb_input_chunk_get_real_size(ic); + if (real_size > 0) { + current_size = (size_t) real_size; + } + } + + if (projected_size > current_size) { + return projected_size - current_size; + } + + return 0; +} + int flb_input_chunk_release_space_compound( struct flb_input_chunk *new_input_chunk, struct flb_output_instance *output_plugin, + size_t required_space, size_t *local_release_requirement, int release_local_space) { @@ -629,10 +882,11 @@ int flb_input_chunk_release_space_compound( storage_backlog_instance = output_plugin->config->storage_input_plugin; - *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk); + *local_release_requirement = required_space; required_space_remainder = (ssize_t) *local_release_requirement; - if (required_space_remainder > 0) { + if (required_space_remainder > 0 && + storage_backlog_instance != NULL) { result = flb_input_chunk_release_space(new_input_chunk, storage_backlog_instance, output_plugin, @@ -721,6 +975,7 @@ int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic, result = flb_input_chunk_release_space_compound( ic, o_ins, + chunk_size, &local_release_requirement, FLB_TRUE); @@ -2296,6 +2551,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, int ret; int new_chunk = FLB_FALSE; size_t out_size; + size_t placement_size; struct flb_input_chunk *ic = NULL; if (tag_len > FLB_INPUT_CHUNK_TAG_MAX) { @@ -2370,8 +2626,10 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks * (based in creation time) to get enough space for the incoming chunk. */ + placement_size = flb_input_chunk_get_projected_write_size(ic, chunk_size); + if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) - && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { + && flb_input_chunk_place_new_chunk(ic, placement_size) == 0) { /* * If the chunk is not newly created, the chunk might already have logs inside. * We cannot delete (reused) chunks here. diff --git a/tests/integration/scenarios/in_forward/config/in_forward_storage_limit_shared_success_output.yaml b/tests/integration/scenarios/in_forward/config/in_forward_storage_limit_shared_success_output.yaml new file mode 100644 index 00000000000..9f0a6a7798a --- /dev/null +++ b/tests/integration/scenarios/in_forward/config/in_forward_storage_limit_shared_success_output.yaml @@ -0,0 +1,34 @@ +service: + flush: 1 + grace: 5 + log_level: info + scheduler.base: 60 + scheduler.cap: 60 + storage.path: ${FORWARD_STORAGE_PATH} + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: forward + listen: 127.0.0.1 + port: ${FLUENT_BIT_TEST_LISTENER_PORT} + storage.type: filesystem + + outputs: + - name: file + alias: file.1 + match: shared.* + path: ${FORWARD_FILE_OUTPUT_PATH} + file: delivered.log + + - name: http + alias: non_working_endpoint + match: shared.* + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + uri: /shared + format: json + json_date_key: false + retry_limit: false + storage.total_limit_size: 10K diff --git a/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py b/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py index 92ce2940b77..acd0f08f719 100644 --- a/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py +++ b/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py @@ -141,13 +141,16 @@ class StorageLimitService(Service): def __init__(self, config_file): super().__init__(config_file) self.storage_path = tempfile.mkdtemp(prefix="fluent_bit_forward_storage_") + self.file_output_path = tempfile.mkdtemp(prefix="fluent_bit_forward_file_output_") self.service.extra_env["FORWARD_STORAGE_PATH"] = self.storage_path + self.service.extra_env["FORWARD_FILE_OUTPUT_PATH"] = self.file_output_path def stop(self): try: super().stop() finally: shutil.rmtree(self.storage_path, ignore_errors=True) + shutil.rmtree(self.file_output_path, ignore_errors=True) def count_chunk_files(self): stream_dir = Path(self.storage_path) / "forward.0" @@ -163,6 +166,36 @@ def chunk_file_contents(self): return [path.read_bytes() for path in stream_dir.rglob("*.flb") if path.is_file()] + def file_output_contents(self): + output_dir = Path(self.file_output_path) + if not output_dir.exists(): + return "" + + contents = [] + for path in output_dir.rglob("*"): + if path.is_file(): + contents.append(path.read_text(encoding="utf-8", errors="replace")) + + return "\n".join(contents) + + def wait_for_file_output_contains(self, text, timeout=10): + return self.service.wait_for_condition( + lambda: self.file_output_contents() + if text in self.file_output_contents() + else None, + timeout=timeout, + interval=0.2, + description=f"file output text {text!r}", + ) + + def wait_for_http_request_count(self, minimum_count, timeout=10): + return self.service.wait_for_condition( + lambda: data_storage["requests"] if len(data_storage["requests"]) >= minimum_count else None, + timeout=timeout, + interval=0.2, + description=f"{minimum_count} retrying HTTP output requests", + ) + class ForwardReceiverService: def __init__(self, config_file, *, extra_env=None): @@ -1437,3 +1470,70 @@ def multi_output_eviction_snapshot(): assert any(b"shared.one" in content for content in chunk_contents) assert not any(b"solo.one" in content for content in chunk_contents) + + +def test_in_forward_storage_limit_shared_success_route_deletes_old_chunk(): + service = StorageLimitService("in_forward_storage_limit_shared_success_output.yaml") + timeout = 30 if os.environ.get("VALGRIND") else 10 + service.start() + + try: + configure_http_response(status_code=500, body={"status": "retry"}) + + _send_tcp_payload( + service.flb_listener_port, + _message_mode_payload("shared.one", {"message": "shared-one"}), + ) + service.wait_for_file_output_contains("shared-one", timeout=timeout) + service.wait_for_http_request_count(1, timeout=timeout) + + _send_tcp_payload( + service.flb_listener_port, + _message_mode_payload("shared.two", {"message": "shared-two"}), + ) + service.wait_for_file_output_contains("shared-two", timeout=timeout) + service.wait_for_http_request_count(2, timeout=timeout) + + service.service.wait_for_condition( + lambda: service.count_chunk_files() == 2, + timeout=timeout, + interval=0.2, + description="2 shared chunk files before stale route eviction", + ) + + _send_tcp_payload( + service.flb_listener_port, + _message_mode_payload("shared.three", {"message": "shared-three"}), + ) + service.wait_for_file_output_contains("shared-three", timeout=timeout) + service.wait_for_http_request_count(3, timeout=timeout) + + def shared_success_eviction_snapshot(): + chunk_contents = service.chunk_file_contents() + + if len(chunk_contents) != 2: + return None + + if any(b"shared-one" in content for content in chunk_contents): + return None + + if not any(b"shared-two" in content for content in chunk_contents): + return None + + if not any(b"shared-three" in content for content in chunk_contents): + return None + + return chunk_contents + + chunk_contents = service.service.wait_for_condition( + shared_success_eviction_snapshot, + timeout=timeout, + interval=0.2, + description="shared-output stale route eviction snapshot", + ) + finally: + service.stop() + + assert not any(b"shared-one" in content for content in chunk_contents) + assert any(b"shared-two" in content for content in chunk_contents) + assert any(b"shared-three" in content for content in chunk_contents) diff --git a/tests/internal/input_chunk.c b/tests/internal/input_chunk.c index 827957604f9..d9b51c85d07 100644 --- a/tests/internal/input_chunk.c +++ b/tests/internal/input_chunk.c @@ -1,6 +1,7 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ #include +#include #include #include #include @@ -75,6 +76,51 @@ static int file_to_buf(const char *path, char **out_buf, size_t *out_size) return 0; } +static int count_chunk_files(const char *path) +{ + int total; + size_t name_len; + struct stat st; + struct dirent *entry; + DIR *dir; + char full_path[PATH_MAX]; + + total = 0; + dir = opendir(path); + if (dir == NULL) { + return 0; + } + + while ((entry = readdir(dir)) != NULL) { + if (strcmp(entry->d_name, ".") == 0 || + strcmp(entry->d_name, "..") == 0) { + continue; + } + + snprintf(full_path, sizeof(full_path) - 1, "%s/%s", path, entry->d_name); + full_path[sizeof(full_path) - 1] = '\0'; + + if (stat(full_path, &st) != 0) { + continue; + } + + if (S_ISDIR(st.st_mode)) { + total += count_chunk_files(full_path); + continue; + } + + name_len = strlen(entry->d_name); + if (name_len > 4 && + strcmp(entry->d_name + name_len - 4, ".flb") == 0) { + total++; + } + } + + closedir(dir); + + return total; +} + /* Given a target, lookup the .out file and return it content in a tail_file_lines structure */ static struct tail_file_lines *get_out_file_content(const char *target) { @@ -920,6 +966,163 @@ void flb_test_input_chunk_grouped_release_space_drop_counters(void) flb_free(storage_path); } +void flb_test_input_chunk_prefers_deletable_files_on_limit(void) +{ + int records; + int chunk_file_count; + struct flb_input_instance *i_ins; + struct flb_output_instance *o_shared; + struct flb_output_instance *o_solo; + struct mk_list *head; + struct mk_list *tmp; + struct flb_input_chunk *ic; + struct flb_task *task; + struct flb_config *cfg; + struct cio_ctx *cio; + struct mk_event_loop *evl; + struct cio_options opts = {0}; + char *root_path; + char stream_path[PATH_MAX]; + char temp_path[128]; + char buf[2048]; + size_t shared_chunk_size; + size_t solo_chunk_size; + size_t total_limit; + + snprintf(temp_path, sizeof(temp_path) - 1, + "/input-chunk-prefer-deletable-files-%i/", + getpid()); + temp_path[sizeof(temp_path) - 1] = '\0'; + + root_path = flb_test_tmpdir_cat(temp_path); + TEST_CHECK(root_path != NULL); + if (!root_path) { + return; + } + + memset(buf, 0x5A, sizeof(buf)); + + flb_init_env(); + cfg = flb_config_init(); + evl = mk_event_loop_create(256); + + TEST_CHECK(evl != NULL); + if (!evl) { + flb_config_exit(cfg); + flb_free(root_path); + return; + } + + cfg->evl = evl; + flb_log_create(cfg, FLB_LOG_STDERR, FLB_LOG_DEBUG, NULL); + + i_ins = flb_input_new(cfg, "dummy", NULL, FLB_TRUE); + TEST_CHECK(i_ins != NULL); + if (!i_ins) { + flb_config_exit(cfg); + flb_free(root_path); + return; + } + i_ins->storage_type = CIO_STORE_FS; + + cio_options_init(&opts); + opts.root_path = root_path; + opts.log_cb = log_cb; + opts.log_level = CIO_LOG_DEBUG; + opts.flags = CIO_OPEN; + + cio = cio_create(&opts); + TEST_CHECK(cio != NULL); + if (!cio) { + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); + flb_free(root_path); + return; + } + + flb_storage_input_create(cio, i_ins); + flb_input_init_all(cfg); + + snprintf(stream_path, sizeof(stream_path) - 1, + "%s/%s", root_path, i_ins->name); + stream_path[sizeof(stream_path) - 1] = '\0'; + + o_shared = flb_output_new(cfg, "http", NULL, FLB_TRUE); + o_solo = flb_output_new(cfg, "http", NULL, FLB_TRUE); + TEST_CHECK(o_shared != NULL); + TEST_CHECK(o_solo != NULL); + if (!o_shared || !o_solo) { + cio_destroy(cio); + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); + flb_free(root_path); + return; + } + + o_shared->id = 0; + o_solo->id = 1; + + flb_output_set_property(o_shared, "match", "shared.*"); + flb_output_set_property(o_shared, "storage.total_limit_size", "10M"); + flb_output_set_property(o_solo, "match", "*"); + flb_output_set_property(o_solo, "storage.total_limit_size", "10M"); + + TEST_CHECK_(flb_router_io_set(cfg) != -1, "unable to router"); + + records = flb_mp_count(buf, sizeof(buf)); + + TEST_CHECK(flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, + records, "shared.one", 10, + buf, sizeof(buf)) == 0); + ic = mk_list_entry_last(&i_ins->chunks, struct flb_input_chunk, _head); + shared_chunk_size = flb_input_chunk_get_real_size(ic); + + TEST_CHECK(flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, + records, "solo.one", 8, + buf, sizeof(buf)) == 0); + ic = mk_list_entry_last(&i_ins->chunks, struct flb_input_chunk, _head); + solo_chunk_size = flb_input_chunk_get_real_size(ic); + + total_limit = shared_chunk_size + solo_chunk_size + (solo_chunk_size / 2); + o_solo->total_limit_size = total_limit; + + chunk_file_count = count_chunk_files(stream_path); + TEST_CHECK(chunk_file_count == 2); + + TEST_CHECK(flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, + records, "solo.two", 8, + buf, sizeof(buf)) == 0); + + chunk_file_count = count_chunk_files(stream_path); + + /* + * The oldest chunk is shared with another output and cannot be unlinked + * by only dropping a single route. When the solo output needs space, we + * should prefer the next chunk that can actually be deleted. + */ + TEST_CHECK(chunk_file_count == 2); + TEST_CHECK(mk_list_size(&i_ins->chunks) == 2); + + mk_list_foreach_safe(head, tmp, &i_ins->tasks) { + task = mk_list_entry(head, struct flb_task, _head); + flb_task_destroy(task, FLB_TRUE); + } + + mk_list_foreach_safe(head, tmp, &i_ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + flb_input_chunk_destroy(ic, FLB_TRUE); + } + + cio_destroy(cio); + flb_router_exit(cfg); + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); + flb_free(root_path); +} + /* Test list */ TEST_LIST = { @@ -931,5 +1134,7 @@ TEST_LIST = { {"input_chunk_grouped_auto_records", flb_test_input_chunk_grouped_auto_records}, {"input_chunk_grouped_release_space_drop_counters", flb_test_input_chunk_grouped_release_space_drop_counters}, + {"input_chunk_prefers_deletable_files_on_limit", + flb_test_input_chunk_prefers_deletable_files_on_limit}, {NULL, NULL} };