DEV
BLOG
  • Design
  • Data
  • Discernment

We believe in AI and every day we innovate to make it better than yesterday. We believe in helping others to benefit from the wonders of AI and also in extending a hand to guide them to step their journey to adapt with future.

Know more

Our solutions in action for customers

DOWNLOAD

Featured Post

How we spoke to data in 2 days

What would you do in two days? Let me be more precise, what would you do on a weekend? Depending on the kind of person you are, the answers may differ. Some may wanna stay in, have a good sleep, take it slow. If you are like me, you would be on the road riding […]

Know More

MENU

  • HOME
  • SERVICES
  • PRODUCTS
  • CASE STUDIES
  • INDUSTRIES
  • CAREERS
  • CONTACT US

Artificial Intelligence

Blockchain

Enterprise Solutions

Blog
White Papers
Resources
Videos
News

Data syncing between MySQL and Elasticsearch using go-mysql-elasticsearch

  • mm
    by Adarsh M S on Fri May 29

In this data-driven age, whether you are building a small scale or a large scale application, one of the main ingredients is data. These very essential blobs will be housed in a NoSQL or SQL database based on the data’s nature and the developer’s choice.

Recently, I and my colleague were working on a scoring system that involved scoring of users based on different criteria. Since our data exhibited huge relational characteristics, we chose a MySQL server for warehousing them. After some brainstorming and idea sharing, we were able to layout an effective schema that aligned with our data and requirements.

After implementing the algorithms and other supporting data pipelines we ran some benchmark tests on it. One of the inferences we made was that our engine would require an auto-correction / completion feature since the data coming from a wide user base tends to have typos, abbreviations, etc. We chose Elasticsearch (being a great tool for storing and querying giant text datasets) for creating the search space.

Elasticsearch is a flexible, powerful open-source, distributed real-time search, and analysis engine. It is fast, easy to use, and very useful in many use cases. To implement such a search space, it was necessary to stream data from MySQL databases to ES indices. MySQL dumps can be used to seed the indexes for the first time, but the real challenge was that the data was susceptible to change, so we needed to establish a continuous flow of data between the MySQL databases and the ES indices.

Thus began our hunt for such mechanisms on the internet and we came across some really incredible projects. One of them was the very own ES stack’s logstash, though it seemed promising it couldn’t fill all our requirements. And then we came across this awesome service built with Go. 

Go-mysql-elasticsearch is a service syncing your MySQL data into Elasticsearch automatically. It uses mysqldump to fetch the origin data at first, then syncs data incrementally with binlog. So, now let’s get down to business and set this up.

Prerequisites

  • Go (1.9+)
  • MySQL (< 8.0) [local or cloud instance]
  • Elasticsearch (<6.0)

If your environment is not configured with these requirements, follow the above links for installation instructions. Although the developer suggests using MySQL <8.0 and ES <6.0, we implemented it with MySQL 5.7 and ES 7.7 and it worked without any issues.

A river is an easy way to set up a continuous stream of data going to your Elasticsearch data warehouse. It’s a plug-in service that operates within the Elasticsearch cluster that extracts data (or is pushed with data) that is then indexed in the cluster. It is more practical than the traditional method of manual data indexing because once configured, all data will be automatically updated. This reduces complexity and also enables the creation of a real-time system. In this article, I’ll explain how to establish a MySQL river for ElasticSearch.

Installing MySQL river

Run the following command from your terminal to fetch the project

go get github.com/siddontang/go-mysql-elasticsearch

Note: It will show some message, ignore it. Default download location will be $HOME/go, if you want to use a custom location, configure it with GOPATH variable in your environment.

Add the following line to your ~/.bashrc file
export GOPATH=/path/to/destination/folder
Now run the following command to update the system variables
source ~/.bashrc

Now the project will be downloaded and installed into $GOPATH, which contains a directory src/github.com/siddontang/go-mysql-elasticsearch/, along with the compiled packages (in pkg/) for those libraries and their dependencies.

Change to the project directory

cd $HOME/go/src/github.com/siddontang/go-mysql-elasticsearch
-- or --
cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch

Let’s build the executable for the program, run the following command in your terminal

make

Using MySQL river

Configure MySQL

  • Configure your MySQL binlog to use ROW format

Run the following command from your mysql shell

SET GLOBAL binlog_format = 'ROW';
-- or -- For individual clients use the below command
SET SESSION binlog_format = ‘ROW’;
  • Create your MySQL database and tables.

Note: MySQL table which will be synced should have a PK(primary key), multi columns PK is also allowed.

Configure Elasticsearch

Create the associated Elasticsearch index, document type and mappings if possible, if not, Elasticsearch will create these automatically.

Note: If ES index is not configured, the default mappings will be used. Our approach required accurate search results, so we created the index with custom mappings.

Configure River

The mysql-es configurations are provided in the river.toml file under
$GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc

Change working directory to

cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc

Open the river.toml config in your preferred editor

nano river.toml

MySQL config

# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = ""
my_charset = "utf8" 

You can also use cloud hosted mysql services like RDS etc.. Now specify the db of your choice with the flavor param.

# mysql or mariadb
flavor = "mysql"

“Go-mysql-elasticsearch uses mysqldump to fetch the origin data at first, then sync data incrementally with binlog” so if you want to index the existing data as well, then specify the dump location with the following config, if not, just comment it or leave empty.

# mysqldump execution path
# if not set or empty, ignore mysqldump.
mysqldump = "./var/mysqldump"

Note: We had some issues while trying to restore the dump from RDS, so if you are using a RDS instance, check the takeaway section of this post to know more about this issue.

ES config

# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "127.0.0.1:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""

Database config

Let’s say we have a database named dbx with tables tba, tbb, tbc … To sync this database and it’s preferred tables, we need to specify them under the source config

# MySQL data source
[[source]]
schema = "dbx"
tables = ["tba", "tbb"]

If you want to sync all the tables in a db, just use the ‘*’ wildcard.

# MySQL data source
[[source]]
schema = "dbx"
tables = ["*"]

Other regex expressions are also supported, you can do something like this to sync tables of similar format

# MySQL data source
[[source]]
schema = "dbx"
tables = ["tb[a-z]"]

If you want to sync more than 1 database, you can replicate this config with their respective db names. So the configuration for syncing dbx and dby will look like this

# MySQL data source
[[source]]
schema = "dbx"
tables = ["tb[a-z]"]
[[source]]
schema = "dby"
tables = ["tbc", "tbd"]

Index config

Say, you want to sync table tba to index tb_index. This is configured under the rule section.

[[rule]]
schema = "dbx"
table = "tba"
index = "tb_index"
type = "tba"

Note: Default index and type name will be that of the table name. Change them according to your needs.

If you just want to sync a particular column from the table, use the below config

[[rule]] 
schema = "dbx"
table = "tba"
index = "tb_index"
type = "tba" 
 
filter = ["col_a"] 
 
[rule.field] 
col_a="name"

Here filter represents columns which are to be synced into the index and rule.field represents column and index mapping relation. i.e, in this case, the values from ‘col_a’ will be mapped into ‘name’ field of tb_index.

The rule.field config also supports datatype conversion/specification,

This will map column col_a to elastic search name

[rule.field]
 col_a="name"

This will map column col_a to elastic search name and use array type

[rule.field] 
col_a="name,list"

This will map column col_a to elastic search col_a and use array type

[rule.field] 
col_a=",list"

The “list” modifier will translate a mysql string field like “a, b, c” into an elastic array type ‘{“a”, “b”, “c”}’ this is especially useful if you need to use those fields in filtering in elastic search.

If the created_time field type is “int”, and you want to convert it to “date” type in ES, you can do it as below

[rule.field] 
created_time=",date"

One more feature that we found really useful was the support for wildcards in table specification. It’s especially helpful when you want to index the chunked parts of a table.

[[rule]] 
schema = "dbx"
table = "tb[a-z]"
index = "tb_index"
type = "tba" 
 
filter = ["col_a"] 
 
[rule.field] 
col_a="name"

Note: Make sure that the tables that match the given wildcard should have same schema

For syncing multiple tables into different indices just replicate the rule config with corresponding table and index name just like we did for multiple dbs.

So, for syncing tba and tbd into 2 indices say tb_index_1 and tb_index_2 the config will be

[[rule]] 
schema = "dbx"
table = "tba"
index = "tb_index_1"
type = "tba" 
 
filter = ["col_a"] 
 
[rule.field] 
col_a="name"
[[rule]] 
schema = "dby"
table = "tbd"
index = "tb_index_2"
type = "tbd" 
 
filter = ["col_d"] 
 
[rule.field] 
col_d="name"

With these configurations, we were successfully able to setup a river for syncing mysql data into ES.

Go-mysql-elasticsearch has some more configurations to offer, you can check them out in the project README.md file:
https://github.com/siddontang/go-mysql-elasticsearch#source

All done. Now let’s start our river.

sudo nano /etc/systemd/system/go-mysql-es-river.service

Add the following lines to the service file

[Unit] 
Description=Instance to serve go mysql es river 
After=network.target 
 
[Service] 
User=ubuntu 
Group=www-data 
WorkingDirectory=$GOPATH/src/github.com/siddontang/go-mysql-elasticsearch 
ExecStart=$GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/bin/go-mysql-elasticsearch -config=/
$GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml 
 
[Install] 
WantedBy=multi-user.target

Save the file (Ctrl+o & hit enter) and exit (Ctrl+x)

Now let’s start our service

sudo systemctl start go-mysql-es-river.service

If you want your river to start at system startup, then create a symlink for the service using

sudo systemctl enable go-mysql-es-river.service

Small takeaway

We faced an issue while syncing the mysqldump from RDS. The issue was not related to go-mysql-elasticsearch but rather a permission error related to RDS.

Couldn’t execute ‘FLUSH TABLES WITH READ LOCK’: Access denied for user ‘root’@’%’ (using password: YES) (1045)

This was due to the insufficient privileges to run mysqldump with
–master-data flag in AWS RDS.

If you face this error, either comment the mysqldump config in river.toml and fill the es index using Index API or Bulk API.

If not, you can follow the below steps to populate the index using mysqldump.

  • Use mysqldump to dump the RDS MySQL server and restore it to local MySQL db.
  • Shutdown RDS MySQL server.
  • Start go-mysql-elasticsearch with my_addr set to local db and wait until it’s finished.
  • Stop go-mysql-elasticsearch and delete var/master.info.
  • Restart RDS MySQL server.
  • Restart go-mysql-elasticsearch with my_addr to set to RDS MySQL server and mysqldump to empty.

Some notes from the developer

  • binlog format must be row.
  • binlog row image must be full for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports a full row image.
  • Can not alter table format at runtime.
  • MySQL table which will be synced should have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use “a:b” as the key. The PK data will be used as “id” in Elasticsearch. And you can also config the id’s constituent part with other columns.
  • You should create the associated mappings in Elasticsearch first, I don’t think using the default mapping is a wise decision, you must know how to search accurately.
  • mysqldump must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only.
  • Don’t change too many rows at the same time in one SQL.

Conclusions

This service had a great effect on our team, like smoothing the automation of data syncing between our DB instances and search space, making the data transfer happen in real-time, and tracking all necessary CRUD operations. Ultimately, when we tested our application, we found that the changes made in the SQL server were reflected in the corresponding ES indices in real-time.

 

 

Author

  • mm
    Adarsh M S

A technology enthusiast with an urge to explore vast areas of advancing technologies. Experienced in domains like Computer Vision, Natural Language Processing, Big data. Believes in open source contributions and loves to provide support to the community. Actively involved in building open-source tools related to information retrieval.

Categories

View articles by categories

  • Uncategorized

Subscribe now to get our latest posts

All Rights Reserved. Accubits INC 2020