diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 55006a6..1a680b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,16 +13,16 @@ jobs: name: Test and Check runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 - name: Set up JDK 1.8 - uses: actions/setup-java@v4 + uses: actions/setup-java@c1e323688fd81a25caa38c78aa6df2d33d3e20d9 # v4.8.0 with: java-version: '8' distribution: 'temurin' - name: Setup Gradle - uses: gradle/actions/setup-gradle@v3 + uses: gradle/actions/setup-gradle@d9c87d481d55275bb5441eef3fe0e46805f9ef70 # v3.5.0 - name: Run Spotless Check run: ./gradlew spotlessCheck @@ -32,7 +32,7 @@ jobs: - name: Upload Test Results if: always() - uses: actions/upload-artifact@v4 + uses: actions/upload-artifact@6f51ac03b9356f520e9adb1b1b7802705f340c2b # v4.5.0 with: name: test-results path: build/reports/tests/test/ diff --git a/.github/workflows/gem-push.yml b/.github/workflows/gem-push.yml index 98ad172..24d8da7 100644 --- a/.github/workflows/gem-push.yml +++ b/.github/workflows/gem-push.yml @@ -16,14 +16,14 @@ jobs: packages: write contents: read steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 - name: Set up Java8 - uses: actions/setup-java@v4 + uses: actions/setup-java@c1e323688fd81a25caa38c78aa6df2d33d3e20d9 # v4.8.0 with: distribution: "temurin" java-version: "8" - name: push gem - uses: trocco-io/push-gem-to-gpr-action@v2 + uses: trocco-io/push-gem-to-gpr-action@25bf27a8aab8b3c95e2aa19189d9815942fdfabf # v2 with: language: java gem-path: "./build/gems/*.gem" diff --git a/build.gradle b/build.gradle index a34bb7b..e0d69c6 100644 --- a/build.gradle +++ b/build.gradle @@ -37,6 +37,9 @@ gem { summary = "Jsonl parser plugin for Embulk" homepage = "https://github.com/shun0102/embulk-parser-jsonl" licenses = ["MIT"] + from("lib") { + into("lib") + } } dependencies { @@ -47,6 +50,7 @@ dependencies { implementation "org.embulk:embulk-util-json:0.3.0" implementation "org.embulk:embulk-util-timestamp:0.2.2" implementation "org.embulk:embulk-util-text:0.1.1" + implementation "org.embulk:embulk-util-guess:0.3.0" testImplementation "junit:junit:4.+" testImplementation "org.embulk:embulk-api:0.10.43" diff --git a/gradle.lockfile b/gradle.lockfile index 8ff5438..ec356de 100644 --- a/gradle.lockfile +++ b/gradle.lockfile @@ -1,19 +1,69 @@ # This is a Gradle generated file for dependency locking. # Manual edits can break the build and are not advised. # This file is expected to be part of source control. +com.fasterxml.jackson.core:jackson-annotations:2.16.2=testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.core:jackson-annotations:2.6.7=compileClasspath,runtimeClasspath +com.fasterxml.jackson.core:jackson-core:2.16.2=testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.core:jackson-core:2.6.7=compileClasspath,runtimeClasspath +com.fasterxml.jackson.core:jackson-databind:2.16.2=testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.core:jackson-databind:2.6.7.5=compileClasspath,runtimeClasspath +com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.16.2=testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7=compileClasspath,runtimeClasspath -javax.validation:validation-api:1.1.0.Final=compileClasspath,runtimeClasspath -org.embulk:embulk-api:0.10.43=compileClasspath -org.embulk:embulk-spi:0.11=compileClasspath -org.embulk:embulk-util-config:0.3.4=compileClasspath,runtimeClasspath -org.embulk:embulk-util-file:0.1.3=compileClasspath,runtimeClasspath -org.embulk:embulk-util-json:0.3.0=compileClasspath,runtimeClasspath +com.fasterxml.jackson:jackson-bom:2.16.2=testCompileClasspath,testRuntimeClasspath +commons-cli:commons-cli:1.7.0=testCompileClasspath,testRuntimeClasspath +commons-codec:commons-codec:1.17.0=testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.109.Final=testCompileClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.109.Final=testCompileClasspath,testRuntimeClasspath +javax.inject:javax.inject:1=testCompileClasspath,testRuntimeClasspath +javax.validation:validation-api:1.1.0.Final=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +junit:junit:4.13.2=testCompileClasspath,testRuntimeClasspath +org.apache.commons:commons-lang3:3.14.0=testCompileClasspath,testRuntimeClasspath +org.apache.httpcomponents:httpclient:4.5.14=testCompileClasspath,testRuntimeClasspath +org.apache.httpcomponents:httpcore:4.4.16=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-api:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-connector-basic:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-impl:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-named-locks:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-spi:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-supplier:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-transport-file:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-transport-http:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven.resolver:maven-resolver-util:1.9.20=testCompileClasspath,testRuntimeClasspath +org.apache.maven:maven-artifact:3.9.6=testCompileClasspath,testRuntimeClasspath +org.apache.maven:maven-builder-support:3.9.6=testCompileClasspath,testRuntimeClasspath +org.apache.maven:maven-model-builder:3.9.6=testCompileClasspath,testRuntimeClasspath +org.apache.maven:maven-model:3.9.6=testCompileClasspath,testRuntimeClasspath +org.apache.maven:maven-repository-metadata:3.9.6=testCompileClasspath,testRuntimeClasspath +org.apache.maven:maven-resolver-provider:3.9.6=testCompileClasspath,testRuntimeClasspath +org.codehaus.plexus:plexus-interpolation:1.26=testCompileClasspath,testRuntimeClasspath +org.codehaus.plexus:plexus-utils:3.5.1=testCompileClasspath,testRuntimeClasspath +org.eclipse.sisu:org.eclipse.sisu.inject:0.9.0.M2=testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-api:0.10.43=compileClasspath,testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-core:0.11.5=testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-deps:0.11.5=testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-junit4:0.11.5=testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-spi:0.11=compileClasspath,testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-util-config:0.3.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-util-file:0.1.3=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-util-guess:0.3.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-util-json:0.3.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.embulk:embulk-util-rubytime:0.3.3=compileClasspath,runtimeClasspath -org.embulk:embulk-util-text:0.1.1=compileClasspath,runtimeClasspath +org.embulk:embulk-util-rubytime:0.4.0=testCompileClasspath,testRuntimeClasspath +org.embulk:embulk-util-text:0.1.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.embulk:embulk-util-timestamp:0.2.2=compileClasspath,runtimeClasspath -org.msgpack:msgpack-core:0.8.24=compileClasspath -org.slf4j:slf4j-api:2.0.7=compileClasspath -empty= +org.embulk:embulk-util-timestamp:0.3.0=testCompileClasspath,testRuntimeClasspath +org.hamcrest:hamcrest-core:1.3=testCompileClasspath,testRuntimeClasspath +org.hamcrest:hamcrest-library:1.3=testCompileClasspath,testRuntimeClasspath +org.jacoco:org.jacoco.agent:0.8.9=jacocoAgent,jacocoAnt +org.jacoco:org.jacoco.ant:0.8.9=jacocoAnt +org.jacoco:org.jacoco.core:0.8.9=jacocoAnt +org.jacoco:org.jacoco.report:0.8.9=jacocoAnt +org.msgpack:msgpack-core:0.8.24=compileClasspath,testCompileClasspath,testRuntimeClasspath +org.ow2.asm:asm-commons:9.5=jacocoAnt +org.ow2.asm:asm-tree:9.5=jacocoAnt +org.ow2.asm:asm:9.5=jacocoAnt +org.slf4j:jcl-over-slf4j:2.0.13=testCompileClasspath,testRuntimeClasspath +org.slf4j:slf4j-api:2.0.13=testRuntimeClasspath +org.slf4j:slf4j-api:2.0.7=compileClasspath,testCompileClasspath +org.yaml:snakeyaml:2.2=testCompileClasspath,testRuntimeClasspath +empty=annotationProcessor,testAnnotationProcessor diff --git a/lib/embulk/guess/jsonl.rb b/lib/embulk/guess/jsonl.rb new file mode 100644 index 0000000..8f1d975 --- /dev/null +++ b/lib/embulk/guess/jsonl.rb @@ -0,0 +1,5 @@ +# Register Java GuessPlugin directly +Embulk::JavaPlugin.register_guess( + "jsonl", "org.embulk.parser.jsonl.JsonlGuessPlugin", + File.expand_path("../../../../classpath", __FILE__) +) diff --git a/src/main/java/org/embulk/parser/jsonl/JsonlGuessPlugin.java b/src/main/java/org/embulk/parser/jsonl/JsonlGuessPlugin.java new file mode 100644 index 0000000..534cb53 --- /dev/null +++ b/src/main/java/org/embulk/parser/jsonl/JsonlGuessPlugin.java @@ -0,0 +1,147 @@ +package org.embulk.parser.jsonl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.spi.Buffer; +import org.embulk.spi.GuessPlugin; +import org.embulk.util.config.ConfigMapperFactory; +import org.embulk.util.guess.SchemaGuess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonlGuessPlugin implements GuessPlugin { + private static final Logger log = LoggerFactory.getLogger(JsonlGuessPlugin.class); + private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = + ConfigMapperFactory.builder().addDefaultModules().build(); + + @Override + @SuppressWarnings("deprecation") + public ConfigDiff guess(ConfigSource config, Buffer sample) { + try { + // Check if parser type is jsonl + ConfigSource parserConfig = config.getNested("parser"); + String parserType = parserConfig.get(String.class, "type", null); + if (!"jsonl".equals(parserType)) { + return CONFIG_MAPPER_FACTORY.newConfigDiff(); + } + + // Get configuration + String newlineType = parserConfig.get(String.class, "newline", "CRLF"); + String charset = parserConfig.get(String.class, "charset", "UTF-8"); + int minRowsForGuess = parserConfig.get(Integer.class, "min_rows_for_guess", 4); + + // Convert newline type to character + String newlineChar = newlineCharacter(newlineType); + + // Parse sample data as texts + byte[] bytes = sample.array(); + String sampleText = new String(bytes, Charset.forName(charset)); + + // Parse JSONL lines + ObjectMapper objectMapper = new ObjectMapper(); + List> rows = new ArrayList<>(); + + for (String line : sampleText.split(newlineChar, -1)) { + if (line.trim().isEmpty()) { + continue; + } + + try { + JsonNode jsonNode = objectMapper.readTree(line); + if (!jsonNode.isObject()) { + log.warn("Skipped non-object JSON value: {}", line); + continue; + } + + LinkedHashMap row = new LinkedHashMap<>(); + jsonNode + .fields() + .forEachRemaining( + entry -> { + row.put(entry.getKey(), convertJsonNodeToObject(entry.getValue())); + }); + rows.add(row); + } catch (IOException e) { + log.warn("Failed to parse line: {}", line, e); + } + } + + // Check minimum rows + if (rows.size() < minRowsForGuess) { + log.info("Not enough rows for guess: {} < {}", rows.size(), minRowsForGuess); + return CONFIG_MAPPER_FACTORY.newConfigDiff(); + } + + if (rows.isEmpty()) { + throw new RuntimeException("SchemaGuess Can't guess schema from no records"); + } + + // Guess schema using SchemaGuess + SchemaGuess schemaGuess = SchemaGuess.of(CONFIG_MAPPER_FACTORY); + List guessedColumns = schemaGuess.fromLinkedHashMapRecords(rows); + + // Convert ConfigDiff to Map for easier manipulation + List> columns = new ArrayList<>(); + for (ConfigDiff columnDiff : guessedColumns) { + Map columnConfig = new LinkedHashMap<>(); + columnConfig.put("name", columnDiff.get(String.class, "name")); + columnConfig.put("type", columnDiff.get(String.class, "type")); + String format = columnDiff.get(String.class, "format", null); + if (format != null) { + columnConfig.put("format", format); + } + columns.add(columnConfig); + } + + // Build result + ConfigDiff result = CONFIG_MAPPER_FACTORY.newConfigDiff(); + result.setNested( + "parser", + CONFIG_MAPPER_FACTORY.newConfigDiff().set("type", "jsonl").set("columns", columns)); + + return result; + } catch (Exception e) { + log.error("Failed to guess schema", e); + return CONFIG_MAPPER_FACTORY.newConfigDiff(); + } + } + + private Object convertJsonNodeToObject(JsonNode node) { + if (node.isNull()) { + return null; + } else if (node.isBoolean()) { + return node.asBoolean(); + } else if (node.isLong()) { + return node.asLong(); + } else if (node.isDouble() || node.isFloat()) { + return node.asDouble(); + } else if (node.isTextual()) { + return node.asText(); + } else if (node.isArray() || node.isObject()) { + return node.toString(); + } else { + return node.asText(); + } + } + + private String newlineCharacter(String newlineType) { + switch (newlineType) { + case "CRLF": + return "\r\n"; + case "LF": + return "\n"; + case "CR": + return "\r"; + default: + return "\r\n"; + } + } +} diff --git a/src/test/java/org/embulk/parser/jsonl/TestJsonlGuessPlugin.java b/src/test/java/org/embulk/parser/jsonl/TestJsonlGuessPlugin.java new file mode 100644 index 0000000..2275874 --- /dev/null +++ b/src/test/java/org/embulk/parser/jsonl/TestJsonlGuessPlugin.java @@ -0,0 +1,112 @@ +package org.embulk.parser.jsonl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.nio.charset.StandardCharsets; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.spi.Buffer; +import org.embulk.util.config.ConfigMapperFactory; +import org.junit.Before; +import org.junit.Test; + +public class TestJsonlGuessPlugin { + private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = + ConfigMapperFactory.builder().addDefaultModules().build(); + + private JsonlGuessPlugin plugin; + + @Before + public void setup() { + plugin = new JsonlGuessPlugin(); + } + + @Test + @SuppressWarnings("deprecation") + public void testGuessSchema() { + String sampleData = + "{\"id\": 1, \"name\": \"Alice\", \"age\": 30}\n" + + "{\"id\": 2, \"name\": \"Bob\", \"age\": 25}\n" + + "{\"id\": 3, \"name\": \"Charlie\", \"age\": 35}\n" + + "{\"id\": 4, \"name\": \"Diana\", \"age\": 28}\n"; + + Buffer buffer = Buffer.wrap(sampleData.getBytes(StandardCharsets.UTF_8)); + + ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource(); + config.setNested( + "parser", + CONFIG_MAPPER_FACTORY.newConfigSource().set("type", "jsonl").set("newline", "LF")); + + ConfigDiff result = plugin.guess(config, buffer); + + System.out.println("Result: " + result); + assertNotNull(result); + + if (!result.getAttributeNames().isEmpty()) { + assertNotNull(result.getNested("parser")); + + ConfigDiff parserConfig = result.getNested("parser"); + assertEquals("jsonl", parserConfig.get(String.class, "type")); + assertNotNull(parserConfig.get(Object.class, "columns")); + } + } + + @Test + @SuppressWarnings("deprecation") + public void testGuessSchemaWithTimestamp() { + String sampleData = + "{\"id\": 1, \"created_at\": \"2023-01-15 10:30:00\"}\n" + + "{\"id\": 2, \"created_at\": \"2023-02-20 14:45:00\"}\n" + + "{\"id\": 3, \"created_at\": \"2023-03-10 09:15:00\"}\n" + + "{\"id\": 4, \"created_at\": \"2023-04-05 16:20:00\"}\n"; + + Buffer buffer = Buffer.wrap(sampleData.getBytes(StandardCharsets.UTF_8)); + + ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource(); + config.setNested( + "parser", + CONFIG_MAPPER_FACTORY.newConfigSource().set("type", "jsonl").set("newline", "LF")); + + ConfigDiff result = plugin.guess(config, buffer); + + assertNotNull(result); + if (!result.getAttributeNames().isEmpty()) { + assertNotNull(result.getNested("parser")); + } + } + + @Test + @SuppressWarnings("deprecation") + public void testGuessSchemaNotEnoughRows() { + String sampleData = "{\"id\": 1, \"name\": \"Alice\"}\n"; + + Buffer buffer = Buffer.wrap(sampleData.getBytes(StandardCharsets.UTF_8)); + + ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource(); + config.setNested( + "parser", + CONFIG_MAPPER_FACTORY.newConfigSource().set("type", "jsonl").set("min_rows_for_guess", 4)); + + ConfigDiff result = plugin.guess(config, buffer); + + // Should return empty result when not enough rows + assertNotNull(result); + } + + @Test + @SuppressWarnings("deprecation") + public void testGuessSchemaWrongParserType() { + String sampleData = "{\"id\": 1}\n"; + + Buffer buffer = Buffer.wrap(sampleData.getBytes(StandardCharsets.UTF_8)); + + ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource(); + config.setNested("parser", CONFIG_MAPPER_FACTORY.newConfigSource().set("type", "csv")); + + ConfigDiff result = plugin.guess(config, buffer); + + // Should return empty result when parser type is not jsonl + assertNotNull(result); + } +}