亚马逊S3
本指南演示了如何将无服务器的摄入管道从AWS S3存储桶中设置为Mixpanel。设置此设置后,您可以简单地将包含事件的文件上传到指定的S3存储桶中,并将事件摄入到一次性和经常性的Mixpanel中。设置应花费约5-10分钟。
先决条件
本指南假设您正在亚马逊网络服务中运行,并且具有必要的IAM访问权限,可以从S3中读取AWS lambda。
步骤1:创建一个S3存储桶
创建一个专用的S3存储桶为此集成。我们建议包括Mixpanel-Import
用名称使其明确并避免任何意外数据共享。
您可以在任何区域中创建水桶,尽管它需要与您的AWS lambda函数在同一区域。
步骤2a:设置lambda功能
创建一个新的lambda功能。
- 选择
作者从头开始
。 - 将运行时间设置为
Python 3.7
。 - 选择
创建功能
。
步骤2B:编写lambda功能
更改文件名lambda_function.py
到main.py
并编辑运行时设置
处理程序Main.Handler
。粘贴以下代码main.py
。
从dateTime导入日期从io导入textiowrapper导入gzip import import import import import import import import import urllib.parse导入import os import os import boto3 import project_id =“”#www.jy710.com/project/ user = user =“”#“”#服务帐户密码#一旦击中这些限制,将这些限制点击events_per_batch = 2000 bytes_per_batch = 2 * 1024 * 1024 * 1024#调整测试数据的事件时间戳,以使其在Mixpanel中显示出最新的。#注意:这仅用于演示目的。#用自己的生产转换替换。def transform_to_event(line):“”““将文件的线转换为Mixpanel格式中的JSON字符串。今天()。strftime(“%s”))返回json.dumps(event)def flush(batch):pareload = gzip.compress(“ \ n” .join(batch).encode(“ utf-8”))tries = 0 while true:wess = requests.post(“ https://api.www.jy710.com/import”,params = {“ strict”:“ 1”,“ project_id”:project_id},headers = {“ content-type“:”应用程序/x-ndjson,“ content-insoding”:“ gzip”,“ user-agent”:“ mixpanel-s3”},auth =(user,pass),data = pareload,如果有swess。status_code == 429或resp.status_code> = 500:time.sleep(min(2 ** tries,60) + andant.randint.randint(1,5))尝试 + = 1继续返回revers def def处理程序(事件,上下文):s3 = boto3.client('s3')bucket = event ['records'] [0] ['s3'] ['bucket'] ['name'][0] ['s3'] ['object'] ['键'],编码='utf-8')blob = s3.get_object(buck_object(bucket = bucket = key = key),如果blob ['wonsevensemetadata'] [''] [''''httpheaders'] ['content-type'] =='应用程序/x-gzip':gzipped = gzip.gzipfile(none,'rb',file obj = blob ['hody'])data = textiowrapper(gzpipped).read(read).read()blob ['body']。read()。解码('utf-8')。splitlines()error = note batch,batch_bytes = [],数据中的line:#for data:#在这里我们假设每行都是有效的json代表一个 single Mixpanel event # If lines are not in Mixpanel's format, apply any transformations here. line = transform_to_event(line) batch.append(line) batch_bytes += len(line) if len(batch) == EVENTS_PER_BATCH or batch_bytes >= BYTES_PER_BATCH: resp = flush(batch) batch, batch_bytes = [], 0 if resp.status_code != 200: error = resp.json() break # Flush final batch if batch and not error: resp = flush(batch) if resp.status_code != 200: error = resp.json() print( json.dumps( { "message": "Import complete", "request": event, "success": error is None, "first_error": error, "severity": "INFO", } ) )
验证
不要忘记填写
Project_ID
,,,,用户
,,,,经过
带有您的Project_ID和服务帐户凭据的变量。
步骤2C:添加触发器
添加触发器,以便每当将新对象添加到您的存储桶中时,您的lambda函数就可以运行添加触发器
在下面功能概述
。您将需要使用步骤2中创建的存储桶,然后选择所有对象创建事件
。
步骤2D:更新配置和部署
选择配置
>一般配置
>编辑
。
- 改变
记忆
到1024 MB
。 - 改变
暂停
到5分钟
。
点击版本
>发布新版本
部署功能。在这一点上,您上传到存储桶的任何文件都会触发该功能的调用,并将导入到Mixpanel中。让我们测试一下!
步骤3:使用示例数据测试并观看日志
让我们测试一些连接样本事件。运行以下内容将它们复制到您的存储桶中,这将触发导入:
gsutil cp cp gs://mixpanel-sample-data/10-events.json s3://your-s3-bucket/example.json
监视Lambda功能的日志;你应该看到一个导入完整
一分钟内的日志线。
如果导航到CloudWatch日志,您将看到一个更详细的日志,其中包括正在导入的文件名和遇到的第一个错误(如果有)。
最后,让我们确认事件使它成为Mixpanel。前往实时视图, 挑选test_event
如果有选择器,您应该看到您刚导入的事件。
步骤4:导入更多数据
现在,我们已经准备好进口您自己的数据了。如果您的数据尚未达到MixPanel格式,那么这是编写转换步骤以作为Lambda的一部分运行的好时机。我们建议您在数据转换逻辑上进行本地测试,因为它比重新部署Lambda要快得多。这概述页面具有示例代码和数据以在本地测试。
准备好并使用几个小文件进行了测试后,您可以将导入的所有文件上传到S3存储桶中,并且导入将启动。可以通过定期将文件上传到S3存储库来重复出现。
我们建议在JSON的S3至200MB中将文件分区以获得最佳性能。
更新 7个月前