Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/main/java/org/embulk/parser/jsonl/JsonlParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,7 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
final SchemaConfig schemaConfig = getSchemaConfig(task);
final TimestampFormatter[] timestampFormatters = newTimestampFormatters(task, schemaConfig);
final Charset charset = Charset.forName(task.getCharset());
final Newline newline = Newline.valueOf(task.getNewline());
final LineDelimiter lineDelimiter = newlineToLineDelimiter(newline);
final LineDecoder decoder = LineDecoder.of(input, charset, lineDelimiter);
final LineDecoder decoder = LineDecoder.of(input, charset, null);
final JsonParser jsonParser = newJsonParser();
final boolean stopOnInvalidRecord = task.getStopOnInvalidRecord();

Expand Down
93 changes: 93 additions & 0 deletions src/test/java/org/embulk/parser/jsonl/TestJsonlParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,78 @@ public void testEmptyLinesWithStopOnInvalidRecord() throws Exception {
assertEquals(2, records.size());
}

/**
* Test case: Config specifies LF, but actual data uses CRLF. This should process all 3 lines, not
* just the first one.
*/
@Test
public void testMismatchConfigLFDataCRLF() throws Exception {
SchemaConfig schema = schema(column("id", LONG), column("name", STRING));
ConfigSource config = config().set("columns", schema).set("newline", "LF");

// Create test data with CRLF line endings
String testData =
"{\"id\":1,\"name\":\"Alice\"}\r\n"
+ "{\"id\":2,\"name\":\"Bob\"}\r\n"
+ "{\"id\":3,\"name\":\"Charlie\"}\r\n";

List<Object[]> records = runParser(config, testData);

// Should process all 3 records, not just 1
assertEquals("Should process all 3 records with CRLF data", 3, records.size());
assertEquals(1L, records.get(0)[0]);
assertEquals("Alice", records.get(0)[1]);
assertEquals(2L, records.get(1)[0]);
assertEquals("Bob", records.get(1)[1]);
assertEquals(3L, records.get(2)[0]);
assertEquals("Charlie", records.get(2)[1]);
}

/**
* Test case: Config specifies CRLF, but actual data uses LF. This should process all 3 lines, not
* just the first one.
*/
@Test
public void testMismatchConfigCRLFDataLF() throws Exception {
SchemaConfig schema = schema(column("id", LONG), column("name", STRING));
ConfigSource config = config().set("columns", schema).set("newline", "CRLF");

// Create test data with LF line endings
String testData =
"{\"id\":1,\"name\":\"Alice\"}\n"
+ "{\"id\":2,\"name\":\"Bob\"}\n"
+ "{\"id\":3,\"name\":\"Charlie\"}\n";

List<Object[]> records = runParser(config, testData);

// Should process all 3 records, not just 1
assertEquals("Should process all 3 records with LF data", 3, records.size());
assertEquals(1L, records.get(0)[0]);
assertEquals("Alice", records.get(0)[1]);
assertEquals(2L, records.get(1)[0]);
assertEquals("Bob", records.get(1)[1]);
assertEquals(3L, records.get(2)[0]);
assertEquals("Charlie", records.get(2)[1]);
}

/** Test case: Config specifies CR, but actual data uses LF. */
@Test
public void testMismatchConfigCRDataLF() throws Exception {
SchemaConfig schema = schema(column("id", LONG), column("name", STRING));
ConfigSource config = config().set("columns", schema).set("newline", "CR");

// Create test data with LF line endings
String testData =
"{\"id\":1,\"name\":\"Alice\"}\n"
+ "{\"id\":2,\"name\":\"Bob\"}\n"
+ "{\"id\":3,\"name\":\"Charlie\"}\n";

List<Object[]> records = runParser(config, testData);

// Should process all 3 records, not just 1
assertEquals("Should process all 3 records with LF data and CR config", 3, records.size());
}

private ConfigSource config() {
return CONFIG_MAPPER_FACTORY.newConfigSource();
}
Expand All @@ -388,6 +460,27 @@ public void run(TaskSource taskSource, Schema schema) {
return readRecords(schemaRef[0], output.pages);
}

private List<Object[]> runParser(ConfigSource config, String testData) {
plugin = new JsonlParserPlugin();
MockPageOutput output = new MockPageOutput();
final ByteArrayInputStream inputStream =
new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
final Schema[] schemaRef = new Schema[1];

plugin.transaction(
config,
new ParserPlugin.Control() {
@Override
public void run(TaskSource taskSource, Schema schema) {
schemaRef[0] = schema;
FileInput input = createFileInput(inputStream);
plugin.run(taskSource, schema, input, output);
}
});

return readRecords(schemaRef[0], output.pages);
}

private ByteArrayInputStream createInputStream(List<String> lines) {
StringBuilder sb = new StringBuilder();
for (String line : lines) {
Expand Down
Loading