Connecting GE with S3

How to connect load data from S3? I get an error even after setting up the aws config and credentials file. When it prompts “Enter the path of the file”,after running great_expectations init, and I enter the file path at S3, it gives an error. I understand that I need to use S3GlobReaderBatchKwargsGenerator for this, but how to set batch kwargs before even initialising the project using great_expectations init?

A step by step kind of explanation would be helpful. Thanks!

Thanks for reaching out! I think the easiest way forward would be to complete the great_expectations init process as you’ve been doing (selecting 1. Files on a filesystem (for processing with Pandas or Spark) for the first option. But instead of entering an S3 path when prompted for a filepath, enter the filepath of a small sample csv you have saved on your local machine. Since this csv will only be used to generate a sample Expectation Suite and validation results, you can delete these artifacts later. This way, you’ll get the Great Expectations project scaffold, with directory structure and config file, without having to worry about setting everything up manually.

Once you’ve completed the init process, open the great_expectations.yml file that was created - here, we’ll add the configuration for the S3GlobReaderBatchKwargsGenerator. The datasources section should look something like this:

datasources:
  files_datasource:
    module_name: great_expectations.datasource
    data_asset_type:
      module_name: great_expectations.dataset
      class_name: PandasDataset
    class_name: PandasDatasource

Since this datasource was set up in init, it lacks a batch_kwargs_generators section. After adding the appropriate config as outlined in https://docs.greatexpectations.io/en/latest/module_docs/generator_module.html?highlight=s3%20glob#s3globreaderbatchkwargsgenerator, the datasources section should now look something like this (filled in with your own info of course):

datasources:
  files_datasource:
    module_name: great_expectations.datasource
    data_asset_type:
      module_name: great_expectations.dataset
      class_name: PandasDataset
    class_name: PandasDatasource
    batch_kwargs_generators:
      my_s3_generator:
        class_name: S3GlobReaderBatchKwargsGenerator
        bucket: my_bucket.my_organization.priv
        reader_method: parquet  # This will be automatically inferred from suffix where possible, but can be explicitly specified as well
        reader_options:  # Note that reader options can be specified globally or per-asset
          sep: ","
        delimiter: "/"  # Note that this is the delimiter for the BUCKET KEYS. By default it is "/"
        boto3_options:
          endpoint_url: $S3_ENDPOINT  # Use the S3_ENDPOINT environment variable to determine which endpoint to use
        max_keys: 100  # The maximum number of keys to fetch in a single list_objects request to s3. When accessing batch_kwargs through an iterator, the iterator will silently refetch if more keys were available
        assets:
          my_first_asset:
            prefix: my_first_asset/
            regex_filter: .*  # The regex filter will filter the results returned by S3 for the key and prefix to only those matching the regex
            dictionary_assets: True
          access_logs:
            prefix: access_logs
            regex_filter: access_logs/2019.*\.csv.gz
            sep: "~"
            max_keys: 100

You can also check out this post for another config example: How to configure a PySpark datasource for accessing the data from AWS S3?. (You can also do the above programmatically if you have a data_context object, using data_context.add_batch_kwargs_generator( datasource_name, batch_kwargs_generator_name, class_name, **kwargs) - passing in config as kwargs)

Once you have the generator set up properly, you can run great_expectations datasource profile YOUR_DATASOURCE_NAME in the CLI to generate expectation suites using batches yielded by the S3 generator.

Next, you can start playing with the sample validation notebooks found at great_expectations/notebooks/pandas/validation_playground.ipynb (for Pandas datasources). Since you’ll have a batch_kwargs_generator configured, instead of providing batch_kwargs manually as shown in the notebook, you can call context.build_build_batch_kwargs(datasource="datasource_name", batch_kwargs_generator="my_batch_kwargs_generator_name") to yield batch_kwargs, which you can then pass to context.get_batch(batch_kwargs, expectation_suite_name).

If for some reason, the batch_kwargs_generators configuration is still giving you issues, you can always just construct batch_kwargs yourself (since that’s all the batch_kwargs_generators do). For S3, batch_kwargs should have the form:

{
    "s3": "s3a://BUCKET/KEY",
    "reader_options": {...},
    "reader_method": "...",
    "limit": ...
}

(if you’re curious, you can check out the source for building the batch_kwargs here: https://github.com/great-expectations/great_expectations/blob/f5abb426d3837c587846d91157c9a663d6698c4d/great_expectations/datasource/batch_kwargs_generator/s3_batch_kwargs_generator.py#L188)

Hey, thanks for your detailed response, @roblim . However, I am facing issues with the config part. Even after setting up the access key, secret key and region using aws config, I am getting the following error “ValueError: Unable to load datasource s3://idl-deng-staging-raw-uw2-processing-cg-dev/GE_test/npidata_pfile_20190902-20190908.csv – no configuration found or invalid configuration.
I’d be grateful if you would help me out with that. Thanks!

Can you post a code snippet of the call that let to that exception? From the error, it looks like s3://idl-deng-staging-raw-uw2-processing-cg-dev/GE_test/npidata_pfile_20190902-20190908.csv is being passed somewhere as the datasource_name. If it’s not sensitive, can you also share your great_expectations.yml?

Lastly, just to make sure this isn’t an aws thing, can you check in a notebook if you can retrieve that s3 key using boto3?

e.g.:

        import boto3

        s3 = boto3.client("s3")
        s3_response_object = s3.get_object(Bucket=bucket, Key=s3_object_key)
        object_contents = (
            s3_response_object["Body"]
            .read()
            .decode(s3_response_object.get("ContentEncoding", "utf-8"))
        )