SET TIMESTAMP=1527917394/*!*/;BEGIN/*!*/;# at 3751#180602 13:29:54 server id 1 end_log_pos 3819 CRC32 0x8dabdf01 Table_map: `webservice`.`building` mapped to number 74# at 3819#180602 13:29:54 server id 1 end_log_pos 3949 CRC32 0x59a8ed85 Update_rows: table id 74 flags: STMT_END_FBINLOG 'UisSWxMBAAAARAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREGwACAAQAAAAHfq40=UisSWx8BAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAALYnVpbGRpbmctMTAADwB3UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3WTdqNVsPrhZbD64Whe2oWQ=='/*!*/;### UPDATE `webservice`.`building`### WHERE### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */### @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */### SET### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */### @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */# at 3949#180602 13:29:54 server id 1 end_log_pos 3980 CRC32 0x58226b8f Xid = 182COMMIT/*!*/;
SET TIMESTAMP=1527919329/*!*/;update building set Status=1 where Id=2000/*!*/;# at 688#180602 14:02:09 server id 1 end_log_pos 719 CRC32 0x4c550a7d Xid = 200COMMIT/*!*/;
mysqldump
是一个对 MySQL 数据库中的数据进行全量导出的一个工具。mysqldump
的使用方式如下:mysqldump -uelastic -p'Elastic_123' --host=172.16.32.5 -F webservice > dump.sql
172.16.32.5:3306
中导出database:webservice
的所有数据,写入到dump.sql
文件中,指定-F
参数表示在导出数据后重新生成一个新的 binlog 日志文件以记录后续的所有数据操作。dump.sql
中的文件内容如下:-- MySQL dump 10.13 Distrib 5.6.40, for Linux (x86_64)---- Host: 172.16.32.5 Database: webservice-- -------------------------------------------------------- Server version 5.5.5-10.1.9-MariaDBV1.0R012D002-20171127-1822/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;/*!40101 SET NAMES utf8 */;/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;/*!40103 SET TIME_ZONE='+00:00' */;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;---- Table structure for table `building`--DROP TABLE IF EXISTS `building`;/*!40101 SET @saved_cs_client = @@character_set_client */;/*!40101 SET character_set_client = utf8 */;CREATE TABLE `building` (`Id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',`BuildingId` varchar(64) NOT NULL COMMENT '虚拟建筑Id',`Status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '虚拟建筑状态:0、处理中;1、正常;-1,停止;-2,销毁中;-3,已销毁',`BuildingName` varchar(128) NOT NULL DEFAULT '' COMMENT '虚拟建筑名称',`CreateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '创建时间',`UpdateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '更新时间',PRIMARY KEY (`Id`),UNIQUE KEY `BuildingId` (`BuildingId`)) ENGINE=InnoDB AUTO_INCREMENT=2010 DEFAULT CHARSET=utf8 COMMENT='虚拟建筑表';/*!40101 SET character_set_client = @saved_cs_client */;---- Dumping data for table `building`--LOCK TABLES `building` WRITE;/*!40000 ALTER TABLE `building` DISABLE KEYS */;INSERT INTO `building` VALUES (2000,'building-2',0,'6YFcmntKrNBIeTA','2018-05-30 13:28:31','2018-05-30 13:28:31'),(2001,'building-4',0,'4rY8PcVUZB1vtrL','2018-05-30 13:28:34','2018-05-30 13:28:34'),(2002,'building-5',0,'uyjHVUYrg9KeGqi','2018-05-30 13:28:37','2018-05-30 13:28:37'),(2003,'building-7',0,'DNhyEBO4XEkXpgW','2018-05-30 13:28:40','2018-05-30 13:28:40'),(2004,'building-1',0,'TmtYX6ZC0RNB4Re','2018-05-30 13:28:43','2018-05-30 13:28:43'),(2005,'building-6',0,'t8YQcjeXefWpcyU','2018-05-30 13:28:49','2018-05-30 13:28:49'),(2006,'building-10',0,'WozgBc2IchNyKyE','2018-05-30 13:28:55','2018-05-30 13:28:55'),(2007,'building-3',0,'yJk27cmLOVQLHf1','2018-05-30 13:28:58','2018-05-30 13:28:58'),(2008,'building-9',0,'RSbjotAh8tymfxs','2018-05-30 13:29:04','2018-05-30 13:29:04'),(2009,'building-8',0,'IBOMlhaXV6k226m','2018-05-30 13:29:31','2018-05-30 13:29:31');/*!40000 ALTER TABLE `building` ENABLE KEYS */;UNLOCK TABLES;/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;-- Dump completed on 2018-06-02 14:23:51
go-mysql-elasticsearch
的基本原理:如果是第一次启动该程序,首先使用 mysqldump
工具对源 MySQL 数据库进行一次全量同步,通过 elasticsearch client 执行操作写入数据到 ES;然后实现了一个 mysql client,作为 slave 连接到源 MySQL,源 MySQL 作为 master 会将所有数据的更新操作通过 binlog event 同步给 slave,通过解析 binlog event 就可以获取到数据的更新内容,写入到 ES。GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'172.16.32.44';GRANT RELOAD ON *.* TO 'elastic'@'172.16.32.44';
go get github.com/siddontang/go-mysql-elasticsearch
。cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch
。make
进行编译,编译成功后go-mysql-elasticsearch/bin
目录下会生成名为go-mysql-elasticsearch
的可执行文件。vi etc/river.toml
修改配置文件,同步172.16.0.101:3306
数据库中的webservice.building
表到 ES 集群172.16.32.64:9200
的 building index(更详细的配置文件说明请参考 项目文档)。 # MySQL address, user and password# user must have replication privilege in MySQL.my_addr = "172.16.0.101:3306"my_user = "bellen"my_pass = "Elastic_123"my_charset = "utf8"# Set true when elasticsearch use https#es_https = false# Elasticsearch addresses_addr = "172.16.32.64:9200"# Elasticsearch user and password, maybe set by shield, nginx, or x-packes_user = ""es_pass = ""# Path to store data, like master.info, if not set or empty,# we must use this to support breakpoint resume syncing.# TODO: support other storage, like etcd.data_dir = "./var"# Inner Http status addressstat_addr = "127.0.0.1:12800"# pseudo server id like a slaveserver_id = 1001# mysql or mariadbflavor = "mariadb"# mysqldump execution path# if not set or empty, ignore mysqldump.mysqldump = "mysqldump"# if we have no privilege to use mysqldump with --master-data,# we must skip it.#skip_master_data = false# minimal items to be inserted in one bulkbulk_size = 128# force flush the pending requests if we don't have enough items >= bulk_sizeflush_bulk_time = "200ms"# Ignore table without primary keyskip_no_pk_table = false# MySQL data source[[source]]schema = "webservice"tables = ["building"][[rule]]schema = "webservice"table = "building"index = "building"type = "buildingtype"
./bin/go-mysql-elasticsearch -config=./etc/river.toml
。2018/06/02 16:13:21 INFO create BinlogSyncer with config {1001 mariadb 172.16.0.101 3306 bellen utf8 false false <nil> false false 0 0s 0s 0}2018/06/02 16:13:21 INFO run status http server 127.0.0.1:128002018/06/02 16:13:21 INFO skip dump, use last binlog replication pos (mysql-bin.000001, 120) or GTID %!s(<nil>)2018/06/02 16:13:21 INFO begin to sync binlog from position (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO register slave for master server 172.16.0.101:33062018/06/02 16:13:21 INFO start sync binlog at binlog file (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO rotate to (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO rotate binlog to (mysql-bin.000001, 120)2018/06/02 16:13:21 INFO save position (mysql-bin.000001, 120)
go-mysql-elasticsearch
完成了最基本的 MySQL 实时同步数据到 ES 的功能,业务如果需要更深层次的功能如允许运行中修改 MySQL 表结构,可以进行自行定制化开发。GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'elastic'@'%' IDENTIFIED BY 'Elastic_123'
KafkaGenericMutationAvroConsumer
类,可以直接继承该类使用,或者自行解析。git clone https://github.com/mardambey/mypipe.git
。./sbt package
。mypipe-runner/src/main/resources/application.conf
。mypipe {# Avro schema repository client class nameschema-repo-client = "mypipe.avro.schema.SchemaRepo"# consumers represent sources for mysql binary logsconsumers {localhost {# database "host:port:user:pass" arraysource = "172.16.0.101:3306:elastic:Elastic_123"}}# data producers export data out (stdout, other stores, external services, etc.)producers {kafka-generic {class = "mypipe.kafka.producer.KafkaMutationGenericAvroProducer"}}# pipes join consumers and producerspipes {kafka-generic {enabled = trueconsumers = ["localhost"]producer {kafka-generic {metadata-brokers = "172.16.16.22:9092"}}binlog-position-repo {# saved to a file, this is the default if unspecifiedclass = "mypipe.api.repo.ConfigurableFileBasedBinaryLogPositionRepository"config {file-prefix = "stdout-00" # required if binlog-position-repo is specifiecdata-dir = "/tmp/mypipe/data" # defaults to mypipe.data-dir if not present}}}}}
mypipe-api/src/main/resources/reference.conf
,修改include-event-condition
选项,指定需要同步的 database 和 table。include-event-condition = """ db == "webservice" && table =="building" """
topic: webservice_building_generic
,默认情况下 mypipe 以${db}_${table}_generic
为 topic 名,向该 topic 发送数据。./sbt "project runner" "runMain mypipe.runner.PipeRunner"
。ConsumerRecord(topic=u'webservice_building_generic', partition=0, offset=2, timestamp=None, timestamp_type=None, key=None, value='\\x00\\x01\\x00\\x00\\x14webservice\\x10building\\xcc\\x01\\x02\\x91,\\xae\\xa3fc\\x11\\xe8\\xa1\\xaaRT\\x00Z\\xf9\\xab\\x00\\x00\\x04\\x18BuildingName\\x06xxx\\x14BuildingId\\nId-10\\x00\\x02\\x04Id\\xd4%\\x00', checksum=128384379, serialized_key_size=-1, serialized_value_size=88)
本页内容是否解决了您的问题?