Validate only latest file in S3

Is there a way to specify batch_kwargs to pick up the latest files from an s3 bucket? I’d like to validate only the latest drop in each week. Some upstream data gets dropped in a bucket like s3://incoming/2020-05-26 (and next week the path is different and also I don’t want to reprocess anything that was processed before, which is why a regex wouldn’t work, I think).

Many thanks!

Hi! This is a great use case for GE. Can you tell me a little more about your technical stack and how you are planning on deploying validations so I can suggest an appropriate solution?

Sure! We use Spark and run all spark jobs as containers (in cluster mode - although I’m thinking that even spark local to container might be enough, depending on the size of the files to validate). I was going to build a docker image with the expectation suite and pyspark built in, and the entrypoint in the container would be a bash script consisting of some great_expectations command line commands (or an auto-generated script?). So datasource will be S3, and the processing engine will be Spark.

Jobs are scheduled via a simple cluster scheduler (not airflow), that’s give the image to run and the schedule to run (and optionally some environment variables).

All datadocs should be published to a private S3 bucket. (That’s a minor annoyance - no data should leave the cluster/s3, so how do I actually view private html files with validation results?)

I started off with 0.10.x and am just now experimenting with 0.11.x, I see the interface has changed?

Thanks so much!

Thanks for the details!

Ok, the bottom line is that you’ll need to dynamically adjust your batch_kwargs by writing a small python function that has your latest file logic in it. This small function could return the S3 path of the target file(s).

To deploy this, I would recommend using the checkpoint feature. You’ll first need to make a new checkpoint using great_expectations checkpoint new. Then you can use the great_expectations checkpoint script feature to generate a python script that you can use in your spark job.

Your custom function would live in that script and would modify the batch_kwargs after the checkpoint is loaded (which is just a plain python dictionary at that point).

Here’s an article that describes the usage of these commands:

1 Like