I have been playing around with Einstein Analytics (the thing they used to call Wave) and I wanted to automate the upload of data since there’s no reason on having dashboards and lenses if the data is stale.
After using Lambda functions against the Bulk API I wanted to have something similar and I found another nice project over at Heroku’s GitHub account called pyAnalyticsCloud
I don’t have a Postgres Database so I ended up using only the uploader.py file and wrote this Lambda function to use it:
from __future__ import print_function import json from base64 import b64decode import boto3 import uuid import os import logging import unicodecsv from uploader import AnalyticsCloudUploader logger = logging.getLogger() logger.setLevel(logging.INFO) s3_client = boto3.client('s3') username = os.environ['SF_USERNAME'] encrypted_password = os.environ['SF_PASSWORD'] encrypted_security_token = os.environ['SF_SECURITYTOKEN'] password = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_password))['Plaintext'].decode('ascii') security_token = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_security_token))['Plaintext'].decode('ascii') file_bucket = os.environ['FILE_BUCKET'] wsdl_file_key = os.environ['WSDL_FILE_KEY'] metadata_file_key = os.environ['METADATA_FILE_KEY'] def bulk_upload(csv_path, wsdl_file_path, metadata_file_path): with open(csv_path, mode='r') as csv_file: logger.info('Initiating Wave Data upload.') logger.debug('Loading metadata') metadata = json.loads(open(metadata_file_path, 'r').read()) logger.debug('Loading CSV data') data = unicodecsv.reader(csv_file) edgemart = metadata['objects'][0]['name'] logger.debug('Creating uploader') uploader = AnalyticsCloudUploader(metadata, data) logger.debug('Logging in to Wave') uploader.login(wsdl_file_path, username, password, security_token) logger.debug('Uploading data') uploader.upload(edgemart) logger.info('Wave Data uploaded.') return 'OK' def handler(event, context): for record in event['Records']: # Incoming CSV file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] csv_path = '/tmp/{}{}'.format(uuid.uuid4(), key) s3_client.download_file(bucket, key, csv_path) # WSDL file wsdl_file_path = '/tmp/{}{}'.format(uuid.uuid4(), wsdl_file_key) s3_client.download_file(file_bucket, wsdl_file_key, wsdl_file_path) # Metadata file metadata_file_path = '/tmp/{}{}'.format(uuid.uuid4(), metadata_file_key) s3_client.download_file(file_bucket, metadata_file_key, metadata_file_path) return bulk_upload(csv_path, wsdl_file_path, metadata_file_path)
Yes the logging is a bit on the extensive side and make sure to add these environment variables in AWS Lambda:
SF_USERNAME - your SF username
SF_PASSWORD - your SF password (encrypted)
SF_SECURITYTOKEN - your SF security token (encrypted)
FILE_BUCKET- the bucket in where to find the mapping file
METADATA_FILE_KEY- the path to the metadata file in that bucket (you get this from Einstein Analytics)
WSDL_FILE_KEY - the path to the wsdl partner file in the bucket
I added an S3 trigger that runs this function as soon as a new file is uploaded. It has some issues (crashing with parenthesis in the file name for example) so please don’t use this for a production workload before making it enterprise grade.
Note: The code above only works in Python 2.7
Cheers