I have been playing around with Einstein Analytics (the thing they used to call Wave) and I wanted to automate the upload of data since there’s no reason on having dashboards and lenses if the data is stale.
After using Lambda functions against the Bulk API I wanted to have something similar and I found another nice project over at Heroku’s GitHub account called pyAnalyticsCloud
I don’t have a Postgres Database so I ended up using only the uploader.py file and wrote this Lambda function to use it:
from __future__ import print_function
import json
from base64 import b64decode
import boto3
import uuid
import os
import logging
import unicodecsv
from uploader import AnalyticsCloudUploader
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
username = os.environ['SF_USERNAME']
encrypted_password = os.environ['SF_PASSWORD']
encrypted_security_token = os.environ['SF_SECURITYTOKEN']
password = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_password))['Plaintext'].decode('ascii')
security_token = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_security_token))['Plaintext'].decode('ascii')
file_bucket = os.environ['FILE_BUCKET']
wsdl_file_key = os.environ['WSDL_FILE_KEY']
metadata_file_key = os.environ['METADATA_FILE_KEY']
def bulk_upload(csv_path, wsdl_file_path, metadata_file_path):
with open(csv_path, mode='r') as csv_file:
logger.info('Initiating Wave Data upload.')
logger.debug('Loading metadata')
metadata = json.loads(open(metadata_file_path, 'r').read())
logger.debug('Loading CSV data')
data = unicodecsv.reader(csv_file)
edgemart = metadata['objects'][0]['name']
logger.debug('Creating uploader')
uploader = AnalyticsCloudUploader(metadata, data)
logger.debug('Logging in to Wave')
uploader.login(wsdl_file_path, username, password, security_token)
logger.debug('Uploading data')
uploader.upload(edgemart)
logger.info('Wave Data uploaded.')
return 'OK'
def handler(event, context):
for record in event['Records']:
# Incoming CSV file
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
csv_path = '/tmp/{}{}'.format(uuid.uuid4(), key)
s3_client.download_file(bucket, key, csv_path)
# WSDL file
wsdl_file_path = '/tmp/{}{}'.format(uuid.uuid4(), wsdl_file_key)
s3_client.download_file(file_bucket, wsdl_file_key, wsdl_file_path)
# Metadata file
metadata_file_path = '/tmp/{}{}'.format(uuid.uuid4(), metadata_file_key)
s3_client.download_file(file_bucket, metadata_file_key, metadata_file_path)
return bulk_upload(csv_path, wsdl_file_path, metadata_file_path)
Yes the logging is a bit on the extensive side and make sure to add these environment variables in AWS Lambda:
SF_USERNAME - your SF username
SF_PASSWORD - your SF password (encrypted)
SF_SECURITYTOKEN - your SF security token (encrypted)
FILE_BUCKET- the bucket in where to find the mapping file
METADATA_FILE_KEY- the path to the metadata file in that bucket (you get this from Einstein Analytics)
WSDL_FILE_KEY - the path to the wsdl partner file in the bucket
I added an S3 trigger that runs this function as soon as a new file is uploaded. It has some issues (crashing with parenthesis in the file name for example) so please don’t use this for a production workload before making it enterprise grade.
Note: The code above only works in Python 2.7
Cheers