Add Lake Formation credential vending for offline store ingestion (Spark 3.5+)#57
Add Lake Formation credential vending for offline store ingestion (Spark 3.5+)#57BassemHalim wants to merge 29 commits intomainfrom
Conversation
Add LakeFormationCredentials case class to hold LF-vended temporary credentials with isExpiringSoon() for refresh logic. Extend FeatureGroupArnResolver with resolveAccountId() and resolvePartition() to extract account ID and ARN partition from feature group ARNs. --- X-AI-Prompt: add lake formation managed table support to spark connector X-AI-Tool: kiro-cli
Add LakeFormationHelper singleton with checkAndVendCredentials() that calls Glue GetTable to detect LF-managed tables and vends temporary credentials via GetTemporaryGlueTableCredentials. All failures gracefully fall back to default credentials. Add lazy GlueClient and LakeFormationClient getters to ClientFactory. Add lakeformation SDK dependency to build.sbt. --- X-AI-Prompt: add lake formation managed table support to spark connector X-AI-Tool: kiro-cli
Wire LF detection and credential vending into batchIngestIntoOfflineStore for both Glue table (parquet) and Iceberg write paths. Add lfCredentials parameter to SparkSessionInitializer methods to configure Hadoop S3A with TemporaryAWSCredentialsProvider when LF credentials are present. Update tests with mock GlueClient setup. --- X-AI-Prompt: add lake formation managed table support to spark connector X-AI-Tool: kiro-cli
Add support for vending temporary LakeFormation credentials when ingesting data into LF-managed offline stores. Changes include: - Add useLakeFormationCreds parameter to ingest_data/ingestData - Exclude SLF4J from fat JAR to fix logging conflict with PySpark - Bump LF fallback-to-default-creds log level to WARN - Add logging to FeatureStoreManager for LF credential flow - Increase Scala test coverage from 82% to 93% (new LF tests) - Add scala-spark-sdk README with build instructions --- X-AI-Prompt: add LF cred support, fix SLF4J conflict, add logging, increase test coverage, add Scala README X-AI-Tool: kiro-cli
ad22211 to
b497902
Compare
b497902 to
b2c5e45
Compare
…in LakeFormationHelperTest Replace when(...).thenReturn(...) with doReturn(...).when(...) style stubbing in LakeFormationHelperTest to prevent scala.reflect.internal.Symbols$CyclicReference involving GetTemporaryGlueTableCredentialsResponse. The when/thenReturn form triggers reflective type resolution on the AWS SDK builder types which, under certain Spark version classpaths, causes a cyclic reference in Scala 2.12 compiler-generated metadata. The doReturn form avoids this by setting up the return value before invoking the mock method. x-ai-tool: kiro
b2c5e45 to
78aec6f
Compare
- Fix potential NPE in ingestDataInJava when useLakeFormationCreds is null from Java/PySpark by wrapping with Option().getOrElse(true) - Override toString in LakeFormationCredentials to mask sensitive fields and prevent credential leakage in logs - Store partition in LakeFormationCredentials for consistent refresh behavior instead of re-deriving from region - Use DEBUG for success-path log messages, keep WARN for fallbacks - Add Scaladoc for useLakeFormationCreds parameter - Document hardcoded credential duration limitation - Skip redundant GetTable call when useLakeFormationCreds is true, vend credentials directly and let failures fall back gracefully - Remove unused checkAndVendCredentials method and Glue test fixtures --- X-AI-Prompt: address code review issues 1-6, skip GetTable when LF creds enabled, remove checkAndVendCredentials X-AI-Tool: kiro-cli
Remove GlueClient from ClientFactory (import, singleton var, lazy getter, test setter, initialize reset, factory method), its mock from FeatureStoreManagerTest, and the glue SDK dependency from build.sbt. No production code ever called methods on the GlueClient. The LakeFormation credential vending uses the LF SDK directly, not the Glue SDK. --- X-AI-Prompt: find all glue clients and remove unused dead code X-AI-Tool: kiro-cli
78a7476 to
8c69889
Compare
Rename the Lake Formation parameter across Scala and Python SDKs to align with AWS naming conventions (matches Glue Crawler's UseLakeFormationCredentials parameter): - Scala: useLakeFormationCreds -> useLakeFormationCredentials - Python: use_lakeformation_creds -> use_lake_formation_credentials Also change the default value from true to false in all method signatures (ingestData, ingestDataInJava, writeToOfflineStore) and update doc comments accordingly. --- X-AI-Prompt: rename useLakeFormationCreds/use_lakeformation_creds to useLakeFormationCredentials/use_lake_formation_credentials, change default to false X-AI-Tool: kiro-cli
Remove pyspark-sdk/__pycache__/__init__.cpython-310.pyc from git index. This file was accidentally committed in e5c6e32 and is already covered by the **/__pycache__ gitignore rule. --- X-AI-Prompt: remove accidentally tracked pycache file from git index X-AI-Tool: kiro-cli
…s backward compatible
…on_credentials The default for use_lake_formation_credentials was changed from True to False. Update the test assertion that relies on the default value to expect False instead of True. --- X-AI-Prompt: fix failing unit test after renaming argument and changing default to false X-AI-Tool: kiro-cli
- Bump VERSION from 1.3.0 to 2.0.0 - Revert sbt-sonatype from 3.11.3 to 3.9.10 (unrelated to LF feature) - Remove scala-spark-sdk/README.md (will be added in a separate PR) --- X-AI-Prompt: version bump to 2.0, revert sonatype plugin, remove README from branch X-AI-Tool: kiro
663097d to
3e83ec4
Compare
| } | ||
| } | ||
|
|
||
| def refreshIfNeeded(credentials: LakeFormationCredentials): LakeFormationCredentials = { |
There was a problem hiding this comment.
Curious about this, when and where does the refresh happen?
There was a problem hiding this comment.
we call it in batchIngestIntoOfflineStore
like here https://github.com/aws/sagemaker-feature-store-spark/pull/57/changes#diff-570e7dae04e1411a2e05570bc71c7a546c88683dcb5f9b4bcad0545cf4f03f77R332
right before we try to write to make sure they are note expired
There was a problem hiding this comment.
Is there a scenario where workers are still writing to S3 using Lake Formation credentials, but the credentials expire mid-write and cause partial failures? My understanding is that credentials are refreshed once right before the workers start writing.
There was a problem hiding this comment.
it is possible yes if the write takes longer than 1 hour. This would only happen for very large DF in that case the customer should break the data down I think
There was a problem hiding this comment.
just added a note in README about that limitation
| // only compile on Spark 3.5+. | ||
| Test / unmanagedSourceDirectories += { | ||
| val baseDir = baseDirectory.value | ||
| if (majorSparkVersion.toDouble >= 3.5) { |
There was a problem hiding this comment.
What happens in case of spark version "3.10".toDouble, just want to ensure that it doesn't break for future versions
There was a problem hiding this comment.
it will just compile the tests at ./src/test/scala-spark-3.5 given that we only support up to version 3.5 this should not be an issue
…in README The README referenced a non-existent direct_offline_store/directOfflineStore parameter in both the Getting Started and Lake Formation sections. Replace all 6 occurrences with the actual target_stores/targetStores parameter and update the description text to reflect the list-based API. --- X-AI-Prompt: fix incorrect direct_offline_store references in README to match actual API X-AI-Tool: kiro
Extract configureS3aCredentials private method to eliminate duplicate bucket-scoped hadoop credential configuration in both initializeSparkSessionForOfflineStore and initializeSparkSessionForIcebergTable. --- X-AI-Prompt: refactor duplicate S3A credential config code into shared helper X-AI-Tool: kiro
…edential request The connector only appends Parquet files and never deletes S3 objects, so Permission.DELETE (which maps to s3:DeleteObject) is not needed. This follows the principle of least privilege. --- X-AI-Prompt: remove Permission.DELETE from LF credential vending request and README X-AI-Tool: kiro
Replace toDouble-based version comparison with integer major.minor parsing so that versions like 3.10 are correctly compared as greater than 3.5 instead of being truncated to 3.1 by floating-point parsing.
datetime.fromtimestamp() without tz used the local timezone to build the year/month/day/hour partition path, which would produce incorrect paths on machines not set to UTC. Pass tz=timezone.utc explicitly.
Document that Lake Formation temporary credentials can expire during long-running Spark writes, causing S3 403 errors. Recommend batching large DataFrames and calling ingestData per batch to vend fresh credentials.
60548d0 to
ffefbf9
Compare
|
|
||
| import string | ||
| from typing import List | ||
|
|
There was a problem hiding this comment.
nit: if we are importing already why are we also importing DataFrame explicitly?
Issue
Add Lake Formation credential vending support to the SageMaker Feature Store Spark connector, enabling secure S3 access through Lake Formation-scoped temporary credentials instead of relying on the caller's IAM permissions.
Description
This PR introduces an opt-in
useLakeFormationCredentialsparameter (Scala) /use_lake_formation_credentialsparameter (Python) toingestData/ingest_data. When enabled, the connector vends temporary credentials viaGetTemporaryGlueTableCredentialsand configures Hadoop S3A with per-bucket credentials scoped to the offline store's S3 location.Key behaviors:
fs.s3a.bucket.<bucket>.*), so only the target offline store bucket uses LF-scoped credentialsRuntimeExceptionis thrown with an actionable error message -- the connector does not fall back silentlySQLEmrOptimizedCommitProtocol, falls back to open-sourcespark-hadoop-cloud, or fails fast with a clear error)useLakeFormationCredentialsdefaults tofalseValueErroron PySpark < 3.5)Key Changes
Scala
LakeFormationCredentials-- case class with expiration tracking (isExpiringSoonwith 5-min buffer)LakeFormationHelper-- singleton handling credential vending, automatic refresh, Glue table ARN construction, and LF prefix seedingFeatureGroupArnResolver-- extended withresolveAccountId()andresolvePartition()for Glue table ARN construction (supports China and GovCloud partitions)ClientFactory-- extended with lazyLakeFormationClientgetterSparkSessionInitializer-- per-bucket S3ATemporaryAWSCredentialsProviderconfig and S3A magic committer setup (EMR auto-detect, open-source fallback, fail-fast)MinSparkVersionGate-- build-time Spark 3.5+ gating via version-specific source directoriesFeatureStoreManager--useLakeFormationCredentialsparameter added toingestData/ingestDataInJava(defaults tofalse)lakeformationSDK added tobuild.sbtdependenciessbt-sonatypeplugin reverted to 3.9.10Python
FeatureStoreManager.ingest_data()-- gainsuse_lake_formation_credentialsparameter (defaultFalse)ValueErrorif PySpark < 3.5 anduse_lake_formation_credentials=TrueAPI Changes
Scala
Python
Testing
Unit Tests
LakeFormationHelper: vend success, vend failure, credential refreshLakeFormationCredentials: expiry andisExpiringSoonlogicFeatureGroupArnResolver: account ID/partition resolution including China and GovCloud ARNsSparkSessionInitializer: magic committer config (EMR, non-EMR, missing), per-bucket LF credential configFeatureStoreManagerLakeFormationTest(Spark 3.5+ only): verifiesvendCredentialsis not called when LF is disableduse_lake_formation_credentialsparameter passthrough to JVMIntegration Tests
LakeFormationHiveIngestionTest.py-- end-to-end ingestion with LF credentials against a Glue (Hive-partitioned) offline storeLakeFormationIcebergIngestionTest.py-- end-to-end ingestion with LF credentials against an Iceberg offline storePrerequisites
Users enabling Lake Formation credential vending must ensure:
LakeFormationConfigpassed toFeatureGroupManager.create(), which handles registration automatically.lakeformation:GetDataAccesslakeformation:GetTemporaryGlueTableCredentialsglue:GetTable,glue:GetDatabase,glue:GetPartitionssagemaker:DescribeFeatureGroupSELECT,INSERT,DELETE,DESCRIBEAllowExternalDataFiltering: trueAllowFullTableExternalDataAccess: trueExternalDataFilteringAllowListincludes the account running the Spark joborg.apache.spark:spark-hadoop-cloud_2.12:<spark-version>via--packagesorspark.jars.packagesMerge Checklist
Put an
xin the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your pull request.General
Tests
Manual Testing
python test_lf_ingestion.py --feature-group-arn arn:aws:sagemaker:us-west-2:550124139430:feature-group/Lakeformation-Managed-FG-icebergI verified the records made it to glue table using athena
write_time api_invocation_time is_deleted customer_id event_time age total_purchases avg_order_value
1 2026-04-21 16:45:44.999444 UTC 2026-04-21 16:45:44.999444 UTC false cust-001 2026-04-21T16:45:32Z 32 15 49.99
2 2026-04-21 16:45:44.999444 UTC 2026-04-21 16:45:44.999444 UTC false cust-002 2026-04-21T16:45:32Z 45 8 120.5
3 2026-04-21 16:45:44.999444 UTC 2026-04-21 16:45:44.999444 UTC false cust-003 2026-04-21T16:45:32Z 28 22 35.75
The code was also tested on EMR using both an Iceberg and Hive formatted glue tables
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.