中易网

为什么使用mysql

答案:2  悬赏:70  
解决时间 2021-02-24 01:36
为什么使用mysql
最佳答案
免费 + 简单
全部回答
在开发 spark streaming 的公共组件过程中,需要将 binlog 的数据(array[byte])转换为 json 格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果如果本文有讲述不详细,或者错误指出,肯请指出,谢谢对于 binlog 数据,每一次操作(insert/update/delete 等)都会作为一条记录写入 binlog 文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)在数据库中,有 id, name 两个字段,其中 id 为主键,name 随意, age 随意。有两行数据如下idnameage   1john30   2john40   那么你进行操作   update table set age = 50 where name = john的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog 记录中,这一点会在后面的实现中有体现。   下面,我们给出具体的代码,然后对代码进行分析def desirializebyte(b: (string, array[byte])) : (string, string) = {val binlogentry = binlogentryutil.serializetobean(b._2) //将 array[byte] 数据转换成 com.meituan.data.binlog.binlogentry 类,相关类定义参考附录val pkeys = binlogentry.getprimarykeys.asscala //获取主键,这里的 asscala 将 java 的 list 转换为 scala 的 listval rowdatas : list[binlogrow] = binlogentry.getrowdatas.asscala.tolist //获取具体的信息val strrowdatas = rowdatas.map(a => { //将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(k1:v1,k2:v2...kn:vn)] 的形式,方面后面进行 json 化val b = a.getbeforecolumns.asscala //获取 beforcolumnsval c = a.getaftercolumns.asscala //获取 aftercolumnsval mb = b.map(d => (d._1, d._2.getvalue)) //去掉所有不需要的信息,只保留每个字段的值val mc = c.map(c => (c._1, c._2.getvalue)) //去掉所有不需要的信息,只保留每个字段的值(mb, mc) //返回转换后的 beforecolumns 和 aftercolumns})   //下面利用 json4s 进行 json 化   (binlogentry.geteventtype, compact("rowdata" -> strrowdatas.map{w => list("row_data" -> ("before" -> w._1.tomap) ~ ("after" -> w._2.tomap)) //这里的两个 tomap 是必要的,不然里层会变成 list,这个地方比较疑惑的是,//w._1 按理是 map类型,为什么还需要强制转换成 map//而且用 strrowdatas.foreach(x => println(s"${x._1} ${x._2}")打印的结果表名是 map}))   desirializebyte 函数传入 topic 中的一条记录,返回参数自己确定,我这里为了测试,返回一个 (string, string) 的 tuple,第一个字段表示该条记录的 eventtype(insert/update/delete 等),第二个字段为 json 化后的数据。   binlogentryutil.serilizetobean 是一个辅助类,将 binlog 数据转化为一个 java bean 类。   第 4 行,我们得到表对应的主键,第 5 行获得具体的数据第 6 行到第 12 行是 json 化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。   第 14, 15 行就是具体的 json 工作了(使用了 json4s 包进行 json 化)这个过程中有一点需要注意的是,在 json 化的时候,记得为 w._1 和 w._2 加 tomap 操作,不然会变成 list(很奇怪,我将 w._1 和 w._2 打印出来看,都是 map 类型)或者你可以在第 7,8 行的末尾加上 .tomap 操作。这个我查了 api,进行了实验,暂时怀疑是在和 json4s 组合的时候,出现了问题,有待验证。   利用上述代码,我们可以得到下面这样 json 化之后的字符串(我进行了排版,程序返回的 json 串是不换行的){"rowdata":   [{"row_data":   {"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}   }   }]   }"   到这里,基本就完成了一种将 binlog 数据 json 化的代码。   附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。   public static binlogentryserializetobean(byte[] input) {binlogentrybinlogentry = null;   entryentry = deserializefromprotobuf(input);//从 protobuf 反序列化if(entry != null) {   binlogentry = serializetobean(entry);   }   return binlogentry;   }   public static entrydeserializefromprotobuf(byte[] input) {entryentry = null;   try {   entry = entry.parsefrom(input);   //com.alibaba.otter.canal.protocol.canalentry#entry 类的方法,由 protobuf 生成} catch (invalidprotocolbufferexceptionvar3) {logger.error("exception:" + var3);   }   return entry;   }   //将 entry 解析为一个 bean 类   public static binlogentryserializetobean(entryentry) {rowchangerowchange = null;   try {   rowchange = rowchange.parsefrom(entry.getstorevalue());} catch (exceptionvar8) {   throw new runtimeexception("parse event has an error , data:" + entry.tostring(), var8);}   binlogentrybinlogentry = new binlogentry();string[] logfilenames = entry.getheader().getlogfilename().split("\\.");string logfileno = "000000";   if(logfilenames.length > 1) {   logfileno = logfilenames[1];   }   binlogentry.setbinlogfilename(logfileno);binlogentry.setbinlogoffset(entry.getheader().getlogfileoffset());binlogentry.setexecutetime(entry.getheader().getexecutetime());binlogentry.settablename(entry.getheader().gettablename());binlogentry.seteventtype(entry.getheader().geteventtype().tostring());iteratorprimarykeyslist = rowchange.getrowdataslist().iterator();while(primarykeyslist.hasnext()) {   rowdatarowdata = (rowdata)primarykeyslist.next();binlogrowrow = new binlogrow(binlogentry.geteventtype());row.setbeforecolumns(getcolumninfo(rowdata.getbeforecolumnslist()));row.setaftercolumns(getcolumninfo(rowdata.getaftercolumnslist()));binlogentry.addrowdata(row);   }   if(binlogentry.getrowdatas().size() >= 1) {binlogrowprimarykeyslist1 = (binlogrow)binlogentry.getrowdatas().get(0);binlogentry.setprimarykeys(getprimarykeys(primarykeyslist1));} else {   arraylistprimarykeyslist2 = new arraylist();binlogentry.setprimarykeys(primarykeyslist2);}   return binlogentry;   }   public class binlogentry implements serializable {private string binlogfilename;   private long binlogoffset;   private long executetime;   private string tablename;   private string eventtype;   private list<string> primarykeys;   private list<binlogrow> rowdatas = new arraylist();}   public class binlogrow implements serializable {public static final string event_type_insert = "insert";public static final string event_type_update = "update";public static final string event_type_delete = "delete";private string eventtype;   private map<string, binlogcolumn> beforecolumns;private map<string, binlogcolumn> aftercolumns;}   public class binlogcolumn implements serializable {private int index;   private string mysqltype;   private string name;   private boolean iskey;   private boolean updated;   private boolean isnull;   private string value;   }
我要举报
如以上问答内容为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
大家都在看
繁昌县龙湖建设工程有限公司在哪里啊,我有事
6月是什么属性
张家沟桥怎么去啊,有知道地址的么
为什么电喷车关了钥匙还要呆几秒才能灭车
WiFi,ping连接失败怎么办,加急,
有首歌:nuo sa,nuo sa I si no fever nuo s
幼儿和小学教师资格证哪个好考
纯音琴行NO.11地址在哪,我要去那里办事
《红楼梦》第六十三回中,麝月抽中的那支签是
左手玩悠悠球、转笔都行,右手这些都不会,左
儿童游乐设备分类 适合儿童玩的游乐设备有哪
百姓面馆在哪里啊,我有事要去这个地方
喜欢你的女生会因为见到你感到羞涩而不跟你打
为什么50米短跑不能呼吸
天地人和水族地址在什么地方,想过去办事
推荐资讯
浙江卫视高能少年团什么时候播出
泰拉瑞亚大飞龙好打还是机器蠕虫好打
如何回报父母的养育之恩
的的快车为什么会自动在花呗扣费
园前路地址在什么地方,想过去办事
陌生的富豪,向一对缺钱的年轻夫妻提出条件:
为什么在中国,东边人口多,西边人口少?
慈溪职高单考单招是不是每个班都可以考
樟松和铁杉怎么区别
雄祥物流在哪里啊,我有事要去这个地方
在输尿管结石下段微创手术有危险吗
鬼泣3中但丁的剑是霜之哀伤吗?
手机登qq时,显示手机磁盘不足,清理后重新登
刺客的套装怎么选啊?