亚马逊S3

本指南演示了如何将无服务器的摄入管道从AWS S3存储桶中设置为Mixpanel。设置此设置后,您可以简单地将包含事件的文件上传到指定的S3存储桶中,并将事件摄入到一次性和经常性的Mixpanel中。设置应花费约5-10分钟。

1658年1658年

先决条件

本指南假设您正在亚马逊网络服务中运行,并且具有必要的IAM访问权限,可以从S3中读取AWS lambda。

步骤1:创建一个S3存储桶

创建一个专用的S3存储桶为此集成。我们建议包括Mixpanel-Import用名称使其明确并避免任何意外数据共享。

您可以在任何区域中创建水桶,尽管它需要与您的AWS lambda函数在同一区域。

步骤2a:设置lambda功能

创建一个新的lambda功能

  • 选择作者从头开始
  • 将运行时间设置为Python 3.7
  • 选择创建功能
22762276

lambda功能配置

步骤2B:编写lambda功能

更改文件名lambda_function.pymain.py并编辑运行时设置处理程序Main.Handler。粘贴以下代码main.py

16101610
从dateTime导入日期从io导入textiowrapper导入gzip import import import import import import import import import urllib.parse导入import os import os import boto3 import project_id =“”#www.jy710.com/project/  user = user =“”#“”#服务帐户密码#一旦击中这些限制,将这些限制点击events_per_batch = 2000 bytes_per_batch = 2 * 1024 * 1024 * 1024#调整测试数据的事件时间戳,以使其在Mixpanel中显示出最新的。#注意:这仅用于演示目的。#用自己的生产转换替换。def transform_to_event(line):“”““将文件的线转换为Mixpanel格式中的JSON字符串。今天()。strftime(“%s”))返回json.dumps(event)def flush(batch):pareload = gzip.compress(“ \ n” .join(batch).encode(“ utf-8”))tries = 0 while true:wess = requests.post(“ https://api.www.jy710.com/import”,params = {“ strict”:“ 1”,“ project_id”:project_id},headers = {“ content-type“:”应用程序/x-ndjson,“ content-insoding”:“ gzip”,“ user-agent”:“ mixpanel-s3”},auth =(user,pass),data = pareload,如果有swess。status_code == 429或resp.status_code> = 500:time.sleep(min(2 ** tries,60) + andant.randint.randint(1,5))尝试 + = 1继续返回revers def def处理程序(事件,上下文):s3 = boto3.client('s3')bucket = event ['records'] [0] ['s3'] ['bucket'] ['name'][0] ['s3'] ['object'] ['键'],编码='utf-8')blob = s3.get_object(buck_object(bucket = bucket = key = key),如果blob ['wonsevensemetadata'] [''] [''''httpheaders'] ['content-type'] =='应用程序/x-gzip':gzipped = gzip.gzipfile(none,'rb',file obj = blob ['hody'])data = textiowrapper(gzpipped).read(read).read()blob ['body']。read()。解码('utf-8')。splitlines()error = note batch,batch_bytes = [],数据中的line:#for data:#在这里我们假设每行都是有效的json代表一个 single Mixpanel event # If lines are not in Mixpanel's format, apply any transformations here. line = transform_to_event(line) batch.append(line) batch_bytes += len(line) if len(batch) == EVENTS_PER_BATCH or batch_bytes >= BYTES_PER_BATCH: resp = flush(batch) batch, batch_bytes = [], 0 if resp.status_code != 200: error = resp.json() break # Flush final batch if batch and not error: resp = flush(batch) if resp.status_code != 200: error = resp.json() print( json.dumps( { "message": "Import complete", "request": event, "success": error is None, "first_error": error, "severity": "INFO", } ) )

验证

不要忘记填写Project_ID,,,,用户,,,,经过带有您的Project_ID和服务帐户凭据的变量。

步骤2C:添加触发器

添加触发器,以便每当将新对象添加到您的存储桶中时,您的lambda函数就可以运行添加触发器在下面功能概述。您将需要使用步骤2中创建的存储桶,然后选择所有对象创建事件

16081608

步骤2D:更新配置和部署

选择配置>一般配置>编辑

  • 改变记忆1024 MB
  • 改变暂停5分钟
1606年1606年

点击版本>发布新版本部署功能。在这一点上,您上传到存储桶的任何文件都会触发该功能的调用,并将导入到Mixpanel中。让我们测试一下!

步骤3:使用示例数据测试并观看日志

让我们测试一些连接样本事件。运行以下内容将它们复制到您的存储桶中,这将触发导入:

gsutil cp cp gs://mixpanel-sample-data/10-events.json s3://your-s3-bucket/example.json

监视Lambda功能的日志;你应该看到一个导入完整一分钟内的日志线。

26802680

云功能日志

如果导航到CloudWatch日志,您将看到一个更详细的日志,其中包括正在导入的文件名和遇到的第一个错误(如果有)。

1512年1512年

最后,让我们确认事件使它成为Mixpanel。前往实时视图, 挑选test_event如果有选择器,您应该看到您刚导入的事件。

26962696

您可以在LiveView中扩展任何事件,以查看其所有属性。

步骤4:导入更多数据

现在,我们已经准备好进口您自己的数据了。如果您的数据尚未达到MixPanel格式,那么这是编写转换步骤以作为Lambda的一部分运行的好时机。我们建议您在数据转换逻辑上进行本地测试,因为它比重新部署Lambda要快得多。这概述页面具有示例代码和数据以在本地测试。

准备好并使用几个小文件进行了测试后,您可以将导入的所有文件上传到S3存储桶中,并且导入将启动。可以通过定期将文件上传到S3存储库来重复出现。

我们建议在JSON的S3至200MB中将文件分区以获得最佳性能。


这个页面对你有帮助吗?