Uploading CSV data to Einstein Analytics with AWS Lambda (Python)

Posted by Johan on Friday, October 6, 2017

I have been playing around with Einstein Analytics (the thing they used to call Wave) and I wanted to automate the upload of data since there’s no reason on having dashboards and lenses if the data is stale.

After using Lambda functions against the Bulk API I wanted to have something similar and I found another nice project over at Heroku’s GitHub account called pyAnalyticsCloud

I don’t have a Postgres Database so I ended up using only the uploader.py file and wrote this Lambda function to use it:

from __future__ import print_function

import json
from base64 import b64decode
import boto3
import uuid
import os
import logging
import unicodecsv
from uploader import AnalyticsCloudUploader

logger = logging.getLogger()

s3_client = boto3.client('s3')
username = os.environ['SF_USERNAME']
encrypted_password = os.environ['SF_PASSWORD']
encrypted_security_token = os.environ['SF_SECURITYTOKEN']
password = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_password))['Plaintext'].decode('ascii')
security_token = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_security_token))['Plaintext'].decode('ascii')
file_bucket = os.environ['FILE_BUCKET']
wsdl_file_key = os.environ['WSDL_FILE_KEY']
metadata_file_key = os.environ['METADATA_FILE_KEY']

def bulk_upload(csv_path, wsdl_file_path, metadata_file_path):
    with open(csv_path, mode='r') as csv_file:
        logger.info('Initiating Wave Data upload.')
        logger.debug('Loading metadata')
        metadata = json.loads(open(metadata_file_path, 'r').read())

        logger.debug('Loading CSV data')
        data = unicodecsv.reader(csv_file)
        edgemart = metadata['objects'][0]['name']

        logger.debug('Creating uploader')
        uploader = AnalyticsCloudUploader(metadata, data)
        logger.debug('Logging in to Wave')
        uploader.login(wsdl_file_path, username, password, security_token)
        logger.debug('Uploading data')
        logger.info('Wave Data uploaded.')
        return 'OK'

def handler(event, context):
    for record in event['Records']:
        # Incoming CSV file
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        csv_path = '/tmp/{}{}'.format(uuid.uuid4(), key)
        s3_client.download_file(bucket, key, csv_path)

        # WSDL file
        wsdl_file_path = '/tmp/{}{}'.format(uuid.uuid4(), wsdl_file_key)
        s3_client.download_file(file_bucket, wsdl_file_key, wsdl_file_path)

        # Metadata file
        metadata_file_path = '/tmp/{}{}'.format(uuid.uuid4(), metadata_file_key)
        s3_client.download_file(file_bucket, metadata_file_key, metadata_file_path)
        return bulk_upload(csv_path, wsdl_file_path, metadata_file_path)

Yes the logging is a bit on the extensive side and make sure to add these environment variables in AWS Lambda: SF_USERNAME - your SF username SF_PASSWORD - your SF password (encrypted) SF_SECURITYTOKEN - your SF security token (encrypted) FILE_BUCKET- the bucket in where to find the mapping file METADATA_FILE_KEY- the path to the metadata file in that bucket (you get this from Einstein Analytics) WSDL_FILE_KEY - the path to the wsdl partner file in the bucket

I added an S3 trigger that runs this function as soon as a new file is uploaded. It has some issues (crashing with parenthesis in the file name for example) so please don’t use this for a production workload before making it enterprise grade.

Note: The code above only works in Python 2.7