From 8e1c043313cbfccd4bccf853e9326672d1d9853d Mon Sep 17 00:00:00 2001 From: Avinash Kumar Deepak Date: Sun, 29 Mar 2026 22:40:13 +0530 Subject: [PATCH] add java end-to-end study example --- example/java_e2e/README.md | 36 ++++++++++ example/java_e2e/controller.py | 45 ++++++++++++ example/java_e2e/java_e2e.graphml | 47 ++++++++++++ example/java_e2e/pm_java.java | 43 +++++++++++ example/java_e2e/smoke_check.py | 114 ++++++++++++++++++++++++++++++ 5 files changed, 285 insertions(+) create mode 100644 example/java_e2e/README.md create mode 100644 example/java_e2e/controller.py create mode 100644 example/java_e2e/java_e2e.graphml create mode 100644 example/java_e2e/pm_java.java create mode 100644 example/java_e2e/smoke_check.py diff --git a/example/java_e2e/README.md b/example/java_e2e/README.md new file mode 100644 index 00000000..75c0e812 --- /dev/null +++ b/example/java_e2e/README.md @@ -0,0 +1,36 @@ +# Java end-to-end example + +This example runs a Python controller (`controller.py`) and a Java PM node (`pm_java.java`) over the standard concore file-based exchange. + +## Files + +- `controller.py` - Python controller node +- `pm_java.java` - Java PM node using `concoredocker.java` +- `java_e2e.graphml` - workflow graph for the example +- `smoke_check.py` - lightweight verification script + +## Prerequisites + +- Python environment with project dependencies installed +- JDK (for `javac` and `java`) +- `jeromq-0.6.0.jar` + +Download jar (from repo root): + +```bash +mkdir -p .ci-cache/java +curl -fsSL -o .ci-cache/java/jeromq-0.6.0.jar https://repo1.maven.org/maven2/org/zeromq/jeromq/0.6.0/jeromq-0.6.0.jar +``` + +## Run smoke check + +From repo root: + +```bash +python example/java_e2e/smoke_check.py --jar .ci-cache/java/jeromq-0.6.0.jar +``` + +Expected result: + +- script prints `smoke_check passed` +- final `u` and `ym` payloads are printed in concore wire format diff --git a/example/java_e2e/controller.py b/example/java_e2e/controller.py new file mode 100644 index 00000000..e5faa0b7 --- /dev/null +++ b/example/java_e2e/controller.py @@ -0,0 +1,45 @@ +import ast +import os +from pathlib import Path + +import concore +import numpy as np + + +ysp = 3.0 + + +def controller(ym): + if ym[0] < ysp: + return 1.01 * ym + else: + return 0.9 * ym + + +study_dir = os.environ.get("CONCORE_STUDY_DIR", str(Path(__file__).resolve().parent / "study")) +os.makedirs(os.path.join(study_dir, "1"), exist_ok=True) + +concore.inpath = os.path.join(study_dir, "") +concore.outpath = os.path.join(study_dir, "") +concore.default_maxtime(20) +concore.delay = 0.02 + +init_simtime_u = "[0.0, 0.0]" +init_simtime_ym = "[0.0, 0.0]" + +u = np.array([concore.initval(init_simtime_u)]).T +while(concore.simtime + + + + + + + + + + + CJ:example/java_e2e/controller.py + + + + + + + + + + + PJ:example/java_e2e/pm_java.java + + + + + + + + + + CU + + + + + + + + + PYM + + + + + diff --git a/example/java_e2e/pm_java.java b/example/java_e2e/pm_java.java new file mode 100644 index 00000000..1b11e075 --- /dev/null +++ b/example/java_e2e/pm_java.java @@ -0,0 +1,43 @@ +import java.util.ArrayList; +import java.util.List; + +public class pm_java { + private static final String INIT_SIMTIME_U = "[0.0, 0.0]"; + + private static double toDouble(Object value) { + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return 0.0; + } + + public static void main(String[] args) { + String studyDir = System.getenv("CONCORE_STUDY_DIR"); + if (studyDir == null || studyDir.isEmpty()) { + studyDir = "example/java_e2e/study"; + } + double maxTime = 20.0; + + concoredocker.setInPath(studyDir); + concoredocker.setOutPath(studyDir); + concoredocker.setDelay(20); + concoredocker.defaultMaxTime(maxTime); + + while (concoredocker.getSimtime() < maxTime) { + concoredocker.ReadResult readResult = concoredocker.read(1, "u", INIT_SIMTIME_U); + List u = readResult.data; + + double u0 = 0.0; + if (!u.isEmpty()) { + u0 = toDouble(u.get(0)); + } + + double ym0 = u0 + 0.01; + List ym = new ArrayList<>(); + ym.add(ym0); + + System.out.println(concoredocker.getSimtime() + ". u=" + u + " ym=" + ym); + concoredocker.write(1, "ym", ym, 1); + } + } +} diff --git a/example/java_e2e/smoke_check.py b/example/java_e2e/smoke_check.py new file mode 100644 index 00000000..5fc04e92 --- /dev/null +++ b/example/java_e2e/smoke_check.py @@ -0,0 +1,114 @@ +import argparse +import ast +import os +from pathlib import Path +import shutil +import subprocess +import sys + + +JEROMQ_URL = "https://repo1.maven.org/maven2/org/zeromq/jeromq/0.6.0/jeromq-0.6.0.jar" + + +def parse_args(): + parser = argparse.ArgumentParser(description="Run Java e2e example smoke check") + parser.add_argument("--jar", type=Path, help="Path to jeromq jar") + parser.add_argument("--keep-study", action="store_true", help="Keep generated study files") + return parser.parse_args() + +def main(): + args = parse_args() + here = Path(__file__).resolve().parent + repo_root = here.parents[1] + study_dir = here / "study" + jar_path = args.jar or (repo_root / ".ci-cache" / "java" / "jeromq-0.6.0.jar") + + if not jar_path.exists(): + raise FileNotFoundError( + f"Missing jeromq jar at {jar_path}. Download from {JEROMQ_URL} or pass --jar." + ) + + if study_dir.exists(): + shutil.rmtree(study_dir) + (study_dir / "1").mkdir(parents=True, exist_ok=True) + (study_dir / "1" / "concore.maxtime").write_text("20", encoding="utf-8") + + subprocess.run( + [ + "javac", + "-cp", + str(jar_path), + str(repo_root / "concoredocker.java"), + str(here / "pm_java.java"), + ], + check=True, + cwd=repo_root, + ) + + env = os.environ.copy() + env["CONCORE_STUDY_DIR"] = str(study_dir) + classpath = os.pathsep.join([str(repo_root), str(here), str(jar_path)]) + + py_log = open(study_dir / "controller.log", "w", encoding="utf-8") + java_log = open(study_dir / "pm_java.log", "w", encoding="utf-8") + + py_proc = subprocess.Popen( + [sys.executable, str(here / "controller.py")], + cwd=repo_root, + env=env, + stdout=py_log, + stderr=subprocess.STDOUT, + ) + java_proc = subprocess.Popen( + ["java", "-cp", classpath, "pm_java"], + cwd=repo_root, + env=env, + stdout=java_log, + stderr=subprocess.STDOUT, + ) + + try: + py_rc = py_proc.wait(timeout=45) + java_rc = java_proc.wait(timeout=45) + except subprocess.TimeoutExpired: + py_proc.kill() + java_proc.kill() + py_log.close() + java_log.close() + raise RuntimeError("Timed out waiting for node processes") + + py_log.close() + java_log.close() + + if py_rc != 0 or java_rc != 0: + raise RuntimeError( + f"Node process failed (controller={py_rc}, pm_java={java_rc}). " + f"See {study_dir / 'controller.log'} and {study_dir / 'pm_java.log'}." + ) + + u_path = study_dir / "1" / "u" + ym_path = study_dir / "1" / "ym" + if not u_path.exists() or not ym_path.exists(): + raise RuntimeError("Expected output files were not produced") + + u_val = ast.literal_eval(u_path.read_text(encoding="utf-8")) + ym_val = ast.literal_eval(ym_path.read_text(encoding="utf-8")) + if not isinstance(u_val, list) or len(u_val) < 2: + raise RuntimeError("u output did not match expected wire format") + if not isinstance(ym_val, list) or len(ym_val) < 2: + raise RuntimeError("ym output did not match expected wire format") + + print("smoke_check passed") + print(f"u: {u_val}") + print(f"ym: {ym_val}") + + if not args.keep_study and study_dir.exists(): + shutil.rmtree(study_dir) + + class_file = here / "pm_java.class" + if class_file.exists(): + class_file.unlink() + + +if __name__ == "__main__": + main()