Google云存储
本指南演示了如何将Google云存储存储桶中的无服务器摄入管道设置为Mixpanel。设置此设置后,您可以简单地将包含事件的文件上传到指定的GCS存储桶中,并将事件摄入到一次性和经常性的Mixpanel中。设置应花费约10分钟。
先决条件
本指南假设您正在Google Cloud Platform中运行,并且具有必要的IAM访问权限,以使GCS读取云功能。
步骤1:创建一个GCS存储桶
创建一个专用的云存储存储桶为此集成。我们建议包括Mixpanel-Import
用名称使其明确并避免任何意外数据共享。
您可以在任何区域中创建水桶,尽管我们建议美国中央
最高吞吐量。
步骤2a:设置云功能
创建一个新的云功能。
- 将扳机设置为
云储存
,事件类型最终确定/创建
以及在步骤2中创建的存储桶的存储桶名称,这意味着上传到此存储桶的任何对象都会触发此云功能的调用。 - 将内存设置为1GIB,超时为300,并将实例设置为20
步骤2B:编写云功能
将运行时间切换到Python3.9
并从hello_gcs
至主要的
。粘贴以下代码main.py
和需求.txt
。
从dateTime导入日期导入gzip导入json导入随机导入时间从google.cloud导入storage import import import project project_id =“”#www.jy710.com/project/ user ='#批量冲洗一旦这些限制击中events_per_batch = 2000 bytes_per_batch = 2 * 1024 * 1024#调整测试数据的事件时间戳,以便在Mixpanel中显示最近的时间。#注意:这仅用于演示目的。#用自己的生产转换替换。def transform_to_event(line):“”““将文件的线转换为Mixpanel格式中的JSON字符串。今天()。strftime(“%s”))返回json.dumps(event)def flush(batch):pareload = gzip.compress(“ \ n” .join(batch).encode(“ utf-8”))tries = 0 while true:wess = requests.post(“ https://api.www.jy710.com/import”,params = {“ strict”:“ 1”,“ project_id”:project_id},headers = {“ content-type“:“应用程序/x-ndjson”,“ content-insoding”:“ gzip”,“ user-agent”:“ mixpanel-gcs”},auth =(user,pass),data =有效载荷等)status_code == 429或resp.status_code> = 500:time.sleep(min(2 ** tries,60) + andant.randint.randint(1,5))尝试 + = 1继续返回revers def def main(请求,上下文):gcs = storage.client()bucket = gcs.get_bucket(请求[“ bucket”])blob = bucket.get_blob(request [“ name”])compresse = request [request [name'name'']。= none batch,batch_bytes = [],0#为方便起见,请检查以.gz f = blob.open(“ rb”)结尾的文件,如果请求[“名称”]。endSwith(“。gz”)和request.get(“ contentEncoding”)!=“ gzip”:f = gzip.open(f)for f:f:######这里我们假设每行都是有效的 JSON representing a single Mixpanel event # If lines are not in Mixpanel's format, apply any transformations here. line = transform_to_event(line) batch.append(line) batch_bytes += len(line) if len(batch) == EVENTS_PER_BATCH or batch_bytes >= BYTES_PER_BATCH: resp = flush(batch) batch, batch_bytes = [], 0 if resp.status_code != 200: error = resp.json() break # Flush final batch if batch and not error: resp = flush(batch) if resp.status_code != 200: error = resp.json() f.close() print( json.dumps( { "message": "Import complete", "request": request, "success": error is None, "first_error": error, "severity": "INFO", } ) )
Google-Cloud-Storage请求
验证
不要忘记填写
Project_ID
,,,,用户
,,,,经过
带有您的Project_ID和服务帐户凭据的变量。
每当将新对象添加到您的存储桶中时,此代码都会触发。它在新添加的对象上流式传输,假设每行都是Mixpanel的事件格式中的Newline划界JSON。然后,它形成批处理并击中我们的 /导入API,重试任何可重试错误。它在完成时记录了一条消息。如果由于验证错误而失败,则该日志包括遇到的第一个不可退回错误。
点击部署
部署功能。在这一点上,您上传到存储桶的任何文件都会触发该功能的调用,并将导入到Mixpanel中。让我们测试一下!
步骤3:使用示例数据测试并观看日志
让我们测试一些连接样本事件。运行以下内容将它们复制到您的存储桶中,这将触发导入:
gsutil cp gs://mixpanel-sample-data/10-events.json gs://
监视云功能的日志;你应该看到一个导入完整
一分钟内的日志线。
如果导航到StackDriver日志,您将看到一个更详细的日志,其中包括正在导入的文件名和遇到的第一个错误(如果有)。
最后,让我们确认事件使它成为Mixpanel。前往实时视图, 挑选test_event
如果有选择器,您应该看到您刚导入的事件。
步骤4:导入更多数据
现在,我们已经准备好进口您自己的数据了。如果您的数据尚未处于MixPanel格式,则是编写转换步骤以作为云功能的一部分运行的好时机。我们建议您在数据转换逻辑上进行本地测试,因为它比重新部署云功能要快得多。这概述页面具有示例代码和数据以在本地测试。
准备好并使用几个小文件进行了测试后,您可以将导入的所有文件上传到GCS存储桶中,并且导入将启动。可以通过定期将文件上传到GCS存储库来重复出现。
我们建议将GC中的文件分区至〜200MB的JSON,以获得最佳性能。
更新 7个月前