Google Pub/sub
本指南演示了如何将Mixpanel插入Google Cloud中托管的事件集合管道中。设置后,您的活动将路由到Mixpanel,并实时可用于分析。这种方法是无服务器和开源的,需要约5分钟的时间来设置。
注意:如果您使用AWS,则使用Kinesis和AWS Lambda非常相似。
步骤1:创建一个云酒吧/子主题
(如果您已经有一个酒吧/子主题,则可以跳过此步骤,其中有事件流过它。)
创建一个新的酒吧/子主题。所有最终将通往Mixpanel的事件都将流过此酒吧/子主题。
步骤2a:向您的酒吧/子主题添加云功能触发器
在此步骤中,我们设置了一个云功能,每当将事件推到您的酒吧/子主题时,我们就会触发。Google的文档详细介绍了此触发器的工作原理。
从您新创建的主题中,单击+触发云功能
。给云功能一个名称并保存。
步骤2B:编写云功能
将运行时间切换到Python3.9
并从hello_pubsub
至主要的
。粘贴以下代码以获取main..py和unignts.txt。
导入base64从dateTime导入日期import import import import import import import inturn imptim intim import import import import intim project_id =“”#www.jy710.com/project/ user ='',context):#假设该消息由JSON的行组成,每个线路都以MixPanel的格式msg = base64.b64decode(event [“ data”])。decode(“ utf-8”)events = [json.loads(line)for如果len(line)> 0]#调整每个示例事件的timestampps,则在msg.split(“ \ n”)中行(“ \ n”)中的行。注意:这仅适用于此演示,请勿在生产中进行#执行此操作。对于事件中的e:e [“ properties''事件).encode(“ utf-8”))当true:desp = requests.post(“ https://api.www.jy710.com/import”,params = {“ strict”:“ 1”,“ project_id”:project_id},auth =(user,pass),headers = {“ content-type”:“ application/json”,“ content-insoding”:“ gzip”,“ user-agent”:“ mixpanel-pubsub”},数据=有效载荷,)如果desp.status_code == 429或resp.status_code> = 500:#可重试错误,请使用指数向退回tries + = 1 time.sleep(min(2 ** tries,60) + randy.randint(1,1,1,randint)5))继续其他:如果desp.status_code!= 200:#log Failures。在生产中,写入死信的队列打印({“消息”:“导入失败”,“ error”:desp.json(),“严重性”:“ warn”})else:#debugging的log成功。删除生产。print({“消息”:“导入成功”,“ count”:desp.json()[“ num_records_imported”],“严重性”:“ info”,})
要求
验证
不要忘记填写
Project_ID
,,,,用户
,,,,经过
带有您的Project_ID和服务帐户凭据的变量。
该代码对来自传入的Pubsub消息的事件进行了非常简单的传递,导入API。您可以使用此功能将事件从酒吧/子主题转换为Mixpanel的格式,然后将其发送到我们的导入API。
步骤3:测试样本事件
现在,向我们的PubSub主题发布的消息将触发对MixPanel的功能和路线事件的调用。让我们通过手动通过PubSub UI手动发送消息来尝试一下。
在主题页面上,导航到消息 - >发布消息
。
然后将以下事件粘贴为消息主体。
{“ event”:“ test_event”,“ properties”:{“ distract_id”:“ sunrostern”,“ $ insert_id”:“ 28096095”,“标题”:“创建者赠品出版笔记”,“ url”,“ url”:“ https:” https:“ https:” https:“ https:”//www.viewert.com“,“得分”:“ 1”,“ TIME”:1628315585}}}} {“ event”:“ test_event”,“ properties”:{“ dimption_id”:“ feross”:“ feross”,“ $ insert_id”:“ 28059483”,“ title”:“`'at`for相对索引的方法,“ url”:“ https://v8.dev/features/at-method”,“得分”:“ 1”:“ 1”,“ TIME”:1628074042}}} {“ event”:“ test_event”,“ properties”:{“ distract_id”:“ prostoalex”,“ $ insert_id”:“ 28069645”,“ 28069645”,“标题”,“”:\ u2019re a卖点“,” url“:” https://www.wsj.com/articles/home-classrooms-covid-real-estate-11628100036“}}} {“ event”:“ test_event”,“ properties”:{“ distract_id”:“ bingewave”,“ $ insert_id”:“ 28063639”,“标题”:“构建直播电影应用程序和直播电视网站\ u2013ReactJS“,” url”:“ https://medium.com/bingewave/building-a-live-treaming-movie-app-live-tv-website-part-1-D0857AAAC8EA“,“得分”:“ 1”,“ TIME”:1628097439}}} {“ event”:“ test_event”,“ properties”:{“ distract_id”:““ 28063632”,“标题”:“数据可视化中的经典研究”,“ url”:“ https://observablehq.com/@tophtucker/classic-research-research-in-data-visaualization”,“得分”,“得分”:,,,,"time": 1628097398}} {"event": "test_event", "properties": {"distinct_id": "ivandiblasi68", "$insert_id": "28091778", "title": "Il sistema sanitario irlandese colpito da un ransomware", "url": "https://www.kaspersky.it/blog/irish-health-service-ransomware/24680/", "score": "1", "time": 1628278340}} {"event": "test_event", "properties": {"distinct_id": "akdav", "$insert_id": "28091776", "title": "DeFi and KYC", "url": "https://link.medium.com/bsn7z9Jmvib", "score": "1", "time": 1628278312}} {"event": "test_event", "properties": {"distinct_id": "webscraping99", "$insert_id": "28085411", "title": "LinkedIn Profile Detail Scrapers \u2013 Ahmad Software Technologies", "url": "https://ahmadsoftwaretechnologies3.mypixieset.com/linkedin-profile-detail-scrapers/", "score": "1", "time": 1628246831}} {"event": "test_event", "properties": {"distinct_id": "Dorimoody", "$insert_id": "28064346", "title": "Our Children and Our Citations: Each One, Both Together", "url": "https://www.plough.com/en/topics/life/work/our-children-and-our-citations-each-one-both-together", "score": "1", "time": 1628101041}} {"event": "test_event", "properties": {"distinct_id": "feross", "$insert_id": "28064342", "title": "The SEC Has Its Eye on Crypto", "url": "https://www.bloomberg.com/opinion/articles/2021-08-04/the-sec-has-its-eye-on-crypto", "score": "1", "time": 1628101019}}
一旦您发布
,该功能将触发并将上述有效载荷传递给云功能。一分钟之内,您应该看到进口成功
云功能日志中的日志行。
然后您可以导航到实时视图查看Mixpanel中的事件。
步骤4:连接生产管道
在这一点上,您可以以与上述相似的方式通过云功能从生产酒吧/子主题路由事件。简化修改云功能以将事件从内部事件格式转换为Mixpanel期望的格式。这也是剥离任何PII的好点。
连接后,这将导致稳定的事件发送到Mixpanel。愉快的流!
错误处理
在极少数情况下,MixPanel /导入API可能会返回429或5xx错误。这些可以安全地进行。如上所述,我们建议使用抖动策略进行指数级别。如果功能本身耗尽,则可以将PubSub配置为执行指数退回。
如果有效载荷畸形,我们的API可能会返回400错误。在这种情况下,不能摄入该项目,因此不应重述此错误。将这些消息纳入一个死信的标题上,以便以后进行检查。这可能在最初测试连接器时发生,但是在生产中应该很少见,假设流过管道的数据的形状保持一致。
Google有一个很棒的参考在处理可重试和不可退回错误的最佳实践中。
更新 7个月前