diff --git a/src/main/java/org/embulk/parser/jsonl/JsonlParserPlugin.java b/src/main/java/org/embulk/parser/jsonl/JsonlParserPlugin.java index 2107aff..cf6184e 100644 --- a/src/main/java/org/embulk/parser/jsonl/JsonlParserPlugin.java +++ b/src/main/java/org/embulk/parser/jsonl/JsonlParserPlugin.java @@ -155,9 +155,7 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu final SchemaConfig schemaConfig = getSchemaConfig(task); final TimestampFormatter[] timestampFormatters = newTimestampFormatters(task, schemaConfig); final Charset charset = Charset.forName(task.getCharset()); - final Newline newline = Newline.valueOf(task.getNewline()); - final LineDelimiter lineDelimiter = newlineToLineDelimiter(newline); - final LineDecoder decoder = LineDecoder.of(input, charset, lineDelimiter); + final LineDecoder decoder = LineDecoder.of(input, charset, null); final JsonParser jsonParser = newJsonParser(); final boolean stopOnInvalidRecord = task.getStopOnInvalidRecord(); diff --git a/src/test/java/org/embulk/parser/jsonl/TestJsonlParserPlugin.java b/src/test/java/org/embulk/parser/jsonl/TestJsonlParserPlugin.java index 2538b4f..597b554 100644 --- a/src/test/java/org/embulk/parser/jsonl/TestJsonlParserPlugin.java +++ b/src/test/java/org/embulk/parser/jsonl/TestJsonlParserPlugin.java @@ -364,6 +364,78 @@ public void testEmptyLinesWithStopOnInvalidRecord() throws Exception { assertEquals(2, records.size()); } + /** + * Test case: Config specifies LF, but actual data uses CRLF. This should process all 3 lines, not + * just the first one. + */ + @Test + public void testMismatchConfigLFDataCRLF() throws Exception { + SchemaConfig schema = schema(column("id", LONG), column("name", STRING)); + ConfigSource config = config().set("columns", schema).set("newline", "LF"); + + // Create test data with CRLF line endings + String testData = + "{\"id\":1,\"name\":\"Alice\"}\r\n" + + "{\"id\":2,\"name\":\"Bob\"}\r\n" + + "{\"id\":3,\"name\":\"Charlie\"}\r\n"; + + List records = runParser(config, testData); + + // Should process all 3 records, not just 1 + assertEquals("Should process all 3 records with CRLF data", 3, records.size()); + assertEquals(1L, records.get(0)[0]); + assertEquals("Alice", records.get(0)[1]); + assertEquals(2L, records.get(1)[0]); + assertEquals("Bob", records.get(1)[1]); + assertEquals(3L, records.get(2)[0]); + assertEquals("Charlie", records.get(2)[1]); + } + + /** + * Test case: Config specifies CRLF, but actual data uses LF. This should process all 3 lines, not + * just the first one. + */ + @Test + public void testMismatchConfigCRLFDataLF() throws Exception { + SchemaConfig schema = schema(column("id", LONG), column("name", STRING)); + ConfigSource config = config().set("columns", schema).set("newline", "CRLF"); + + // Create test data with LF line endings + String testData = + "{\"id\":1,\"name\":\"Alice\"}\n" + + "{\"id\":2,\"name\":\"Bob\"}\n" + + "{\"id\":3,\"name\":\"Charlie\"}\n"; + + List records = runParser(config, testData); + + // Should process all 3 records, not just 1 + assertEquals("Should process all 3 records with LF data", 3, records.size()); + assertEquals(1L, records.get(0)[0]); + assertEquals("Alice", records.get(0)[1]); + assertEquals(2L, records.get(1)[0]); + assertEquals("Bob", records.get(1)[1]); + assertEquals(3L, records.get(2)[0]); + assertEquals("Charlie", records.get(2)[1]); + } + + /** Test case: Config specifies CR, but actual data uses LF. */ + @Test + public void testMismatchConfigCRDataLF() throws Exception { + SchemaConfig schema = schema(column("id", LONG), column("name", STRING)); + ConfigSource config = config().set("columns", schema).set("newline", "CR"); + + // Create test data with LF line endings + String testData = + "{\"id\":1,\"name\":\"Alice\"}\n" + + "{\"id\":2,\"name\":\"Bob\"}\n" + + "{\"id\":3,\"name\":\"Charlie\"}\n"; + + List records = runParser(config, testData); + + // Should process all 3 records, not just 1 + assertEquals("Should process all 3 records with LF data and CR config", 3, records.size()); + } + private ConfigSource config() { return CONFIG_MAPPER_FACTORY.newConfigSource(); } @@ -388,6 +460,27 @@ public void run(TaskSource taskSource, Schema schema) { return readRecords(schemaRef[0], output.pages); } + private List runParser(ConfigSource config, String testData) { + plugin = new JsonlParserPlugin(); + MockPageOutput output = new MockPageOutput(); + final ByteArrayInputStream inputStream = + new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + final Schema[] schemaRef = new Schema[1]; + + plugin.transaction( + config, + new ParserPlugin.Control() { + @Override + public void run(TaskSource taskSource, Schema schema) { + schemaRef[0] = schema; + FileInput input = createFileInput(inputStream); + plugin.run(taskSource, schema, input, output); + } + }); + + return readRecords(schemaRef[0], output.pages); + } + private ByteArrayInputStream createInputStream(List lines) { StringBuilder sb = new StringBuilder(); for (String line : lines) {