From 0d977c0d19fad4b2264b2b6d2469decb1b58abd4 Mon Sep 17 00:00:00 2001 From: Mike Williams Date: Mon, 23 Mar 2026 18:33:56 +1100 Subject: [PATCH] refactor: use CustomAwsCredentialLoader for dynamic AWS credentials Instead of extracting credential strings from SdkConfig, stuffing them into FileIO props, then extracting them again to build S3 clients: - Implement AwsCredentialLoad to wrap SharedCredentialsProvider, so iceberg's storage layer can refresh credentials dynamically - Pass aws_sdk_s3::Client directly for S3 prefix listing, rather than reconstructing it from FileIO props - s3_file_io() is now sync and infallible - Drop direct reqsign dep; downgrade reqwest to 0.12 to match iceberg-storage-opendal's AwsCredentialLoad trait Closes #21 Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 456 +++++++++++++--------------------------- Cargo.toml | 3 +- src/aws.rs | 135 ++++++------ src/catalog_commands.rs | 13 +- src/file_existence.rs | 59 +----- src/main.rs | 27 +-- src/table_commands.rs | 24 ++- 7 files changed, 256 insertions(+), 461 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a98a4d..ccad7ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,7 +137,7 @@ dependencies = [ "serde_json", "strum 0.27.2", "strum_macros 0.27.2", - "thiserror 2.0.18", + "thiserror", "uuid", "zstd", ] @@ -884,8 +884,7 @@ dependencies = [ "iceberg-catalog-glue", "iceberg-catalog-rest", "iceberg-storage-opendal", - "reqsign 0.19.0", - "reqwest 0.13.2", + "reqwest", "serde", "serde_json", "strum 0.28.0", @@ -1032,12 +1031,6 @@ dependencies = [ "shlex", ] -[[package]] -name = "cesu8" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" - [[package]] name = "cfg-if" version = "1.0.4" @@ -1119,16 +1112,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" -[[package]] -name = "combine" -version = "4.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" -dependencies = [ - "bytes", - "memchr", -] - [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1689,6 +1672,21 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -2123,6 +2121,22 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -2209,7 +2223,7 @@ dependencies = [ "ordered-float 4.6.0", "parquet", "rand 0.8.5", - "reqwest 0.12.28", + "reqwest", "roaring", "serde", "serde_bytes", @@ -2254,7 +2268,7 @@ dependencies = [ "http 1.4.0", "iceberg", "itertools", - "reqwest 0.12.28", + "reqwest", "serde", "serde_derive", "serde_json", @@ -2276,8 +2290,8 @@ dependencies = [ "cfg-if", "iceberg", "opendal", - "reqsign 0.16.5", - "reqwest 0.12.28", + "reqsign", + "reqwest", "serde", "typetag", "url", @@ -2513,28 +2527,6 @@ dependencies = [ "jiff-tzdb", ] -[[package]] -name = "jni" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" -dependencies = [ - "cesu8", - "cfg-if", - "combine", - "jni-sys", - "log", - "thiserror 1.0.69", - "walkdir", - "windows-sys 0.45.0", -] - -[[package]] -name = "jni-sys" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" - [[package]] name = "jobserver" version = "0.1.34" @@ -2636,6 +2628,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.1" @@ -2759,6 +2757,23 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -2845,8 +2860,8 @@ dependencies = [ "md-5", "percent-encoding", "quick-xml 0.38.4", - "reqsign 0.16.5", - "reqwest 0.12.28", + "reqsign", + "reqwest", "serde", "serde_json", "tokio", @@ -2854,12 +2869,50 @@ dependencies = [ "uuid", ] +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-sys" +version = "0.9.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -3108,7 +3161,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.36", "socket2 0.6.2", - "thiserror 2.0.18", + "thiserror", "tokio", "tracing", "web-time", @@ -3120,7 +3173,6 @@ version = "0.11.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" dependencies = [ - "aws-lc-rs", "bytes", "getrandom 0.3.4", "lru-slab", @@ -3130,7 +3182,7 @@ dependencies = [ "rustls 0.23.36", "rustls-pki-types", "slab", - "thiserror 2.0.18", + "thiserror", "tinyvec", "tracing", "web-time", @@ -3308,7 +3360,7 @@ dependencies = [ "percent-encoding", "quick-xml 0.37.5", "rand 0.8.5", - "reqwest 0.12.28", + "reqwest", "rust-ini", "serde", "serde_json", @@ -3317,80 +3369,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "reqsign" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af8fa9a6948938319944b0f5399b1de4324767ac35c8bdbf991b3e71197b1087" -dependencies = [ - "reqsign-command-execute-tokio", - "reqsign-core", - "reqsign-file-read-tokio", - "reqsign-http-send-reqwest", -] - -[[package]] -name = "reqsign-command-execute-tokio" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90eac3cf53a53139831e3fb8ff2189d54059d2191122a31ebd1229a66e338b3d" -dependencies = [ - "async-trait", - "reqsign-core", - "tokio", -] - -[[package]] -name = "reqsign-core" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba66eb941c0f723269a394baf3b19a2fa697a1e593f3e902779df6c35d24e21" -dependencies = [ - "anyhow", - "async-trait", - "base64", - "bytes", - "form_urlencoded", - "hex", - "hmac", - "http 1.4.0", - "jiff", - "log", - "percent-encoding", - "sha1", - "sha2", - "windows-sys 0.61.2", -] - -[[package]] -name = "reqsign-file-read-tokio" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "702f12a867bf8e507de907fa0f4d75b96469ace7edd33fcc1fc8a8ef58f3c8d2" -dependencies = [ - "anyhow", - "async-trait", - "reqsign-core", - "tokio", -] - -[[package]] -name = "reqsign-http-send-reqwest" -version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c76a1aafda3c789146d5df362fb2e760051c38e0957f863c0bec8713d3a35833" -dependencies = [ - "anyhow", - "async-trait", - "bytes", - "futures-channel", - "http 1.4.0", - "http-body-util", - "reqsign-core", - "reqwest 0.13.2", - "wasm-bindgen-futures", -] - [[package]] name = "reqwest" version = "0.12.28" @@ -3399,16 +3377,21 @@ checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64", "bytes", + "encoding_rs", "futures-core", "futures-util", + "h2 0.4.13", "http 1.4.0", "http-body 1.0.1", "http-body-util", "hyper 1.8.1", "hyper-rustls 0.27.7", + "hyper-tls", "hyper-util", "js-sys", "log", + "mime", + "native-tls", "percent-encoding", "pin-project-lite", "quinn", @@ -3419,6 +3402,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.4", "tokio-util", "tower", @@ -3432,44 +3416,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "reqwest" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" -dependencies = [ - "base64", - "bytes", - "encoding_rs", - "futures-core", - "h2 0.4.13", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "hyper 1.8.1", - "hyper-rustls 0.27.7", - "hyper-util", - "js-sys", - "log", - "mime", - "percent-encoding", - "pin-project-lite", - "quinn", - "rustls 0.23.36", - "rustls-pki-types", - "rustls-platform-verifier", - "sync_wrapper", - "tokio", - "tokio-rustls 0.26.4", - "tower", - "tower-http", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "rfc6979" version = "0.3.1" @@ -3530,6 +3476,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.21.12" @@ -3579,33 +3538,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-platform-verifier" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" -dependencies = [ - "core-foundation 0.10.1", - "core-foundation-sys", - "jni", - "log", - "once_cell", - "rustls 0.23.36", - "rustls-native-certs", - "rustls-platform-verifier-android", - "rustls-webpki 0.103.9", - "security-framework", - "security-framework-sys", - "webpki-root-certs", - "windows-sys 0.61.2", -] - -[[package]] -name = "rustls-platform-verifier-android" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" - [[package]] name = "rustls-webpki" version = "0.101.7" @@ -3640,15 +3572,6 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.28" @@ -4105,12 +4028,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] -name = "thiserror" -version = "1.0.69" +name = "tempfile" +version = "3.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" dependencies = [ - "thiserror-impl 1.0.69", + "fastrand", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys 0.52.0", ] [[package]] @@ -4119,18 +4046,7 @@ version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.18", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -4257,6 +4173,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -4530,6 +4456,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -4542,16 +4474,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" -[[package]] -name = "walkdir" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.1" @@ -4711,15 +4633,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-root-certs" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "webpki-roots" version = "1.0.6" @@ -4729,15 +4642,6 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "winapi-util" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" -dependencies = [ - "windows-sys 0.61.2", -] - [[package]] name = "windows-core" version = "0.62.2" @@ -4808,15 +4712,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.2", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -4844,21 +4739,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-targets" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-targets" version = "0.52.6" @@ -4892,12 +4772,6 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -4910,12 +4784,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -4928,12 +4796,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" -[[package]] -name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -4958,12 +4820,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" - [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -4976,12 +4832,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -4994,12 +4844,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -5012,12 +4856,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index b71384f..6f9cc21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,7 @@ iceberg = "0.9.0" iceberg-catalog-glue = "0.9.0" iceberg-catalog-rest = "0.9.0" iceberg-storage-opendal = "0.9.0" -reqsign = "0.19" -reqwest = "0.13" +reqwest = "0.12" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.145" strum = { version = "0.28.0", features = ["derive"] } diff --git a/src/aws.rs b/src/aws.rs index e3335b0..fa21501 100644 --- a/src/aws.rs +++ b/src/aws.rs @@ -1,18 +1,19 @@ //! AWS integration utilities for credential loading use anyhow::Result; +use async_trait::async_trait; use aws_config::BehaviorVersion; use aws_config::meta::credentials::CredentialsProviderChain; use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider}; use iceberg::CatalogBuilder; -use iceberg::io::{ - FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, -}; +use iceberg::io::{FileIO, FileIOBuilder, S3_REGION}; use iceberg_catalog_glue::{ AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder, }; -use iceberg_storage_opendal::OpenDalStorageFactory; +use iceberg_storage_opendal::{ + AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, OpenDalStorageFactory, +}; use std::collections::HashMap; use std::sync::Arc; @@ -31,6 +32,41 @@ fn build_credentials_provider() -> SharedCredentialsProvider { SharedCredentialsProvider::new(chain) } +/// Adapts an AWS SDK credential provider to the `AwsCredentialLoad` trait +/// used by iceberg-storage-opendal. This allows the iceberg storage layer +/// to dynamically refresh credentials rather than using static strings. +struct SdkCredentialLoader { + provider: SharedCredentialsProvider, +} + +#[async_trait] +impl AwsCredentialLoad for SdkCredentialLoader { + async fn load_credential(&self, _: reqwest::Client) -> anyhow::Result> { + let creds = self.provider.provide_credentials().await?; + Ok(Some(AwsCredential { + access_key_id: creds.access_key_id().to_string(), + secret_access_key: creds.secret_access_key().to_string(), + session_token: creds.session_token().map(|s| s.to_string()), + expires_in: None, + })) + } +} + +/// Build a `CustomAwsCredentialLoader` from the SDK config's credential provider. +fn credential_loader(aws_config: &aws_config::SdkConfig) -> Option { + aws_config + .credentials_provider() + .map(|provider| CustomAwsCredentialLoader::new(Arc::new(SdkCredentialLoader { provider }))) +} + +/// Build an `OpenDalStorageFactory` for S3 with optional dynamic credentials. +fn s3_storage_factory(aws_config: &aws_config::SdkConfig) -> Arc { + Arc::new(OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + customized_credential_load: credential_loader(aws_config), + }) +} + pub async fn get_aws_config() -> aws_config::SdkConfig { let config = aws_config::defaults(BehaviorVersion::latest()) .credentials_provider(build_credentials_provider()) @@ -48,31 +84,14 @@ pub async fn get_aws_config() -> aws_config::SdkConfig { config } -pub async fn s3_file_io(aws_config: &aws_config::SdkConfig) -> Result { - let factory = Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: "s3".to_string(), - customized_credential_load: None, - }); - let mut builder = FileIOBuilder::new(factory); +pub fn s3_file_io(aws_config: &aws_config::SdkConfig) -> FileIO { + let mut builder = FileIOBuilder::new(s3_storage_factory(aws_config)); - // Add region from AWS config if let Some(region) = aws_config.region() { builder = builder.with_prop(S3_REGION, region.to_string()); } - // Extract and add credentials from AWS SDK - if let Some(creds_provider) = aws_config.credentials_provider() - && let Ok(creds) = creds_provider.provide_credentials().await - { - builder = builder.with_prop(S3_ACCESS_KEY_ID, creds.access_key_id()); - builder = builder.with_prop(S3_SECRET_ACCESS_KEY, creds.secret_access_key()); - - if let Some(session_token) = creds.session_token() { - builder = builder.with_prop(S3_SESSION_TOKEN, session_token); - } - } - - Ok(builder.build()) + builder.build() } pub async fn glue_catalog(aws_config: &aws_config::SdkConfig) -> Result { @@ -84,12 +103,10 @@ pub async fn glue_catalog(aws_config: &aws_config::SdkConfig) -> Result Result Result<()> { - // Create a test AWS config with mock credentials + async fn test_s3_file_io_sets_region() { let aws_config = test_aws_config().await; - - // Build FileIO with credentials from aws_config - let file_io = s3_file_io(&aws_config).await?; - - // Inspect the properties that were set + let file_io = s3_file_io(&aws_config); let props = file_io.config().props(); - // Verify region was extracted and set assert_eq!(props.get(S3_REGION), Some(&"us-west-2".to_string())); + } - // Verify credentials were extracted and set - assert_eq!( - props.get(S3_ACCESS_KEY_ID), - Some(&"test_access_key".to_string()) - ); - assert_eq!( - props.get(S3_SECRET_ACCESS_KEY), - Some(&"test_secret_key".to_string()) - ); - assert_eq!( - props.get(S3_SESSION_TOKEN), - Some(&"test_session_token".to_string()) - ); + #[tokio::test] + async fn test_credential_loader_returns_credentials() { + let aws_config = test_aws_config().await; + let loader = credential_loader(&aws_config).expect("should have a credential loader"); - Ok(()) + let cred = loader + .load_credential(reqwest::Client::new()) + .await + .expect("should load credential") + .expect("should have credential"); + + assert_eq!(cred.access_key_id, "test_access_key"); + assert_eq!(cred.secret_access_key, "test_secret_key"); + assert_eq!(cred.session_token, Some("test_session_token".to_string())); } #[tokio::test] async fn test_glue_catalog_with_aws_config() -> Result<()> { - // Create a test AWS config with mock credentials let aws_config = test_aws_config().await; - - // Build GlueCatalog with credentials from aws_config let catalog = glue_catalog(&aws_config).await?; - - // Get the FileIO that GlueCatalog created internally let file_io = catalog.file_io(); - - // Inspect the properties that were passed through let props = file_io.config().props(); - // Verify region was extracted and set assert_eq!(props.get(S3_REGION), Some(&"us-west-2".to_string())); - // Verify credentials were extracted and set - assert_eq!( - props.get(S3_ACCESS_KEY_ID), - Some(&"test_access_key".to_string()) - ); - assert_eq!( - props.get(S3_SECRET_ACCESS_KEY), - Some(&"test_secret_key".to_string()) - ); - assert_eq!( - props.get(S3_SESSION_TOKEN), - Some(&"test_session_token".to_string()) - ); - Ok(()) } } diff --git a/src/catalog_commands.rs b/src/catalog_commands.rs index 1ad7de8..0bcb621 100644 --- a/src/catalog_commands.rs +++ b/src/catalog_commands.rs @@ -12,6 +12,7 @@ pub async fn handle_catalog_command( catalog: &dyn Catalog, command: CatalogCommands, output: &mut TerminalOutput, + s3_client: Option<&aws_sdk_s3::Client>, ) -> Result<()> { use crate::cli::NamespaceCmd; @@ -22,7 +23,7 @@ pub async fn handle_catalog_command( NamespaceCmd::Tables => list_tables_in_namespace(catalog, &name, output).await, }, CatalogCommands::Table { name, command } => { - load_and_handle_table(catalog, &name, command, output).await + load_and_handle_table(catalog, &name, command, output, s3_client).await } } } @@ -88,6 +89,7 @@ async fn load_and_handle_table( name: &str, command: crate::cli::TableCommands, output: &mut TerminalOutput, + s3_client: Option<&aws_sdk_s3::Client>, ) -> Result<()> { // Parse table identifier (e.g., "namespace.table" or "db.schema.table") let table_ident = TableIdent::from_strs(name.split('.'))?; @@ -99,7 +101,7 @@ async fn load_and_handle_table( .with_context(|| format!("could not load table '{}'", name))?; // Delegate to table command handler - handle_table_command(&table, command, output).await + handle_table_command(&table, command, output, s3_client).await } #[cfg(test)] @@ -131,7 +133,7 @@ mod tests { let mut buffer = Vec::new(); let mut output = TerminalOutput::with_writer(&mut buffer); - handle_catalog_command(&catalog, CatalogCommands::Namespaces, &mut output).await?; + handle_catalog_command(&catalog, CatalogCommands::Namespaces, &mut output, None).await?; let output_str = String::from_utf8(buffer)?; assert_eq!(output_str, ""); @@ -157,7 +159,7 @@ mod tests { let mut buffer = Vec::new(); let mut output = TerminalOutput::with_writer(&mut buffer); - handle_catalog_command(&catalog, CatalogCommands::Namespaces, &mut output).await?; + handle_catalog_command(&catalog, CatalogCommands::Namespaces, &mut output, None).await?; let output_str = String::from_utf8(buffer)?; let lines: Vec<&str> = output_str.lines().collect(); @@ -199,6 +201,7 @@ mod tests { command: NamespaceCmd::Info, }, &mut output, + None, ) .await?; @@ -263,6 +266,7 @@ mod tests { command: NamespaceCmd::Tables, }, &mut output, + None, ) .await?; @@ -324,6 +328,7 @@ mod tests { command: crate::cli::TableCommands::Metadata, }, &mut output, + None, ) .await?; diff --git a/src/file_existence.rs b/src/file_existence.rs index bae97cf..9af0035 100644 --- a/src/file_existence.rs +++ b/src/file_existence.rs @@ -5,12 +5,8 @@ use anyhow::{Context, Result}; use async_trait::async_trait; -use aws_config::Region; -use aws_credential_types::Credentials; use aws_sdk_s3::Client; -use aws_sdk_s3::config::Builder as S3ConfigBuilder; -use iceberg::io::{FileIO, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN}; -use std::collections::HashMap; +use iceberg::io::FileIO; use std::collections::HashSet; use tracing::debug; use tracing::info; @@ -66,22 +62,23 @@ impl FileExistenceChecker for PreloadedExistenceChecker { /// Creates a file existence checker, using S3 prefix listing if possible. /// -/// If the data prefix is on S3 and we can extract credentials from the FileIO, -/// returns a `PreloadedExistenceChecker` that has listed all objects under the prefix. +/// If the data prefix is on S3 and an S3 client is provided, uses a bulk +/// `ListObjectsV2` call and returns a `PreloadedExistenceChecker`. /// /// Otherwise, returns a `FileIOExistenceChecker` that delegates to per-file checks. pub async fn create_existence_checker( file_io: FileIO, data_prefix: &str, + s3_client: Option<&Client>, ) -> Result> { debug!(data_prefix = %data_prefix, "Creating file existence checker"); - // Try S3 optimization: parse URL and build client + // Try S3 optimization: parse URL and use provided client if let Some((bucket, prefix)) = parse_s3_url(data_prefix) - && let Some(client) = s3_client_from_file_io(&file_io) + && let Some(client) = s3_client { let base_url = format!("s3://{}/{}", bucket, prefix); - let suffixes = list_object_suffixes(&client, bucket, prefix).await?; + let suffixes = list_object_suffixes(client, bucket, prefix).await?; debug!( file_count = suffixes.len(), "Using preloaded S3 existence checker" @@ -136,48 +133,6 @@ async fn list_object_suffixes( Ok(suffixes) } -/// Attempts to build an S3 client from the credentials stored in a FileIO. -fn s3_client_from_file_io(file_io: &FileIO) -> Option { - let props = file_io.config().props(); - debug!("Extracting S3 credentials from FileIO"); - - s3_client_from_props(props) -} - -/// Builds an S3 client from a properties map containing S3 credentials. -fn s3_client_from_props(props: &HashMap) -> Option { - let access_key_id = props.get(S3_ACCESS_KEY_ID); - let secret_access_key = props.get(S3_SECRET_ACCESS_KEY); - let region = props.get(S3_REGION); - - debug!( - has_access_key = access_key_id.is_some(), - has_secret_key = secret_access_key.is_some(), - has_region = region.is_some(), - "Checking FileIO properties for S3 credentials" - ); - - let access_key_id = access_key_id?; - let secret_access_key = secret_access_key?; - let region = region?; - - let credentials = Credentials::new( - access_key_id, - secret_access_key, - props.get(S3_SESSION_TOKEN).cloned(), - None, - "iceberg-file-io", - ); - - let config = S3ConfigBuilder::new() - .behavior_version_latest() - .region(Region::new(region.clone())) - .credentials_provider(credentials) - .build(); - - Some(Client::from_conf(config)) -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/main.rs b/src/main.rs index 6f14d85..fef99e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,15 +10,6 @@ use clap::Parser; use iceberg::io::FileIO; use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; -async fn build_file_io(location: &str) -> Result { - if location.starts_with("s3://") || location.starts_with("s3a://") { - let aws_config = get_aws_config().await; - return s3_file_io(&aws_config).await; - } - - Ok(FileIO::new_with_fs()) -} - #[tokio::main] async fn main() { let cli = Cli::parse(); @@ -60,16 +51,26 @@ async fn main() { async fn run(command: Commands) -> Result<()> { match command { Commands::From { location, command } => { - let file_io = build_file_io(&location).await?; + let is_s3 = location.starts_with("s3://") || location.starts_with("s3a://"); + let (file_io, s3) = if is_s3 { + let aws_config = get_aws_config().await; + ( + s3_file_io(&aws_config), + Some(aws_sdk_s3::Client::new(&aws_config)), + ) + } else { + (FileIO::new_with_fs(), None) + }; let table = load_table(&file_io, &location).await?; let mut output = TerminalOutput::new(); - handle_table_command(&table, command, &mut output).await?; + handle_table_command(&table, command, &mut output, s3.as_ref()).await?; } Commands::Glue { command } => { let aws_config = get_aws_config().await; let catalog = glue_catalog(&aws_config).await?; + let s3 = aws_sdk_s3::Client::new(&aws_config); let mut output = TerminalOutput::new(); - handle_catalog_command(&catalog, command, &mut output).await?; + handle_catalog_command(&catalog, command, &mut output, Some(&s3)).await?; } Commands::Rest { uri, @@ -78,7 +79,7 @@ async fn run(command: Commands) -> Result<()> { } => { let catalog = rest_catalog(&uri, warehouse.as_deref()).await?; let mut output = TerminalOutput::new(); - handle_catalog_command(&catalog, command, &mut output).await?; + handle_catalog_command(&catalog, command, &mut output, None).await?; } } diff --git a/src/table_commands.rs b/src/table_commands.rs index 24a305a..564c737 100644 --- a/src/table_commands.rs +++ b/src/table_commands.rs @@ -76,6 +76,7 @@ pub async fn handle_table_command( table: &Table, command: TableCommands, output: &mut TerminalOutput, + s3_client: Option<&aws_sdk_s3::Client>, ) -> Result<()> { match command { TableCommands::Metadata => handle_metadata(table.metadata(), output), @@ -85,7 +86,7 @@ pub async fn handle_table_command( TableCommands::Snapshot { snapshot_id, command, - } => handle_snapshot(table, &snapshot_id, command, output).await, + } => handle_snapshot(table, &snapshot_id, command, s3_client, output).await, } } @@ -140,6 +141,7 @@ async fn handle_snapshot( table: &Table, snapshot_id: &str, command: SnapshotCmd, + s3_client: Option<&aws_sdk_s3::Client>, output: &mut TerminalOutput, ) -> Result<()> { let metadata = table.metadata(); @@ -162,7 +164,7 @@ async fn handle_snapshot( match command { SnapshotCmd::Info => output.display_object(&SnapshotInfo::from_snapshot(snapshot)), SnapshotCmd::Files { verify } => { - handle_snapshot_files(table, snapshot, verify, output).await + handle_snapshot_files(table, snapshot, verify, s3_client, output).await } } } @@ -180,11 +182,12 @@ async fn handle_snapshot_files( table: &Table, snapshot: &iceberg::spec::Snapshot, verify: bool, + s3_client: Option<&aws_sdk_s3::Client>, output: &mut TerminalOutput, ) -> Result<()> { let existence_checker: Option> = if verify { let prefix = data_file_prefix(table.metadata())?; - Some(create_existence_checker(table.file_io().clone(), &prefix).await?) + Some(create_existence_checker(table.file_io().clone(), &prefix, s3_client).await?) } else { None }; @@ -384,6 +387,7 @@ mod tests { schema_id: "current".to_string(), }, &mut output, + None, ) .await?; @@ -415,6 +419,7 @@ mod tests { command: SnapshotCmd::Info, }, &mut output, + None, ) .await?; @@ -438,7 +443,7 @@ mod tests { let mut buffer = Vec::new(); let mut output = TerminalOutput::with_writer(&mut buffer); - handle_table_command(&table, TableCommands::Metadata, &mut output).await?; + handle_table_command(&table, TableCommands::Metadata, &mut output, None).await?; // Verify JSON output contains metadata fields let output_str = String::from_utf8(buffer)?; @@ -460,7 +465,7 @@ mod tests { let mut buffer = Vec::new(); let mut output = TerminalOutput::with_writer(&mut buffer); - handle_table_command(&table, TableCommands::Schemas, &mut output).await?; + handle_table_command(&table, TableCommands::Schemas, &mut output, None).await?; // Verify JSONL output (one schema per line) let output_str = String::from_utf8(buffer)?; @@ -490,6 +495,7 @@ mod tests { schema_id: "0".to_string(), }, &mut output, + None, ) .await?; @@ -517,6 +523,7 @@ mod tests { schema_id: "invalid".to_string(), }, &mut output, + None, ) .await; @@ -546,6 +553,7 @@ mod tests { schema_id: "999".to_string(), }, &mut output, + None, ) .await; @@ -569,7 +577,7 @@ mod tests { let mut buffer = Vec::new(); let mut output = TerminalOutput::with_writer(&mut buffer); - handle_table_command(&table, TableCommands::Snapshots, &mut output).await?; + handle_table_command(&table, TableCommands::Snapshots, &mut output, None).await?; // Verify JSONL output let output_str = String::from_utf8(buffer)?; @@ -599,6 +607,7 @@ mod tests { command: SnapshotCmd::Info, }, &mut output, + None, ) .await?; @@ -627,6 +636,7 @@ mod tests { command: SnapshotCmd::Info, }, &mut output, + None, ) .await; @@ -657,6 +667,7 @@ mod tests { command: SnapshotCmd::Info, }, &mut output, + None, ) .await; @@ -687,6 +698,7 @@ mod tests { command: SnapshotCmd::Info, }, &mut output, + None, ) .await;