In this data-driven age, whether you are building a small scale or a large scale application, one of the main ingredients is data. These very essential blobs will be housed in a NoSQL or SQL database based on the data’s nature and the developer’s choice.
Recently, I and my colleague were working on a scoring system that involved scoring of users based on different criteria. Since our data exhibited huge relational characteristics, we chose a MySQL server for warehousing them. After some brainstorming and idea sharing, we were able to layout an effective schema that aligned with our data and requirements.
After implementing the algorithms and other supporting data pipelines we ran some benchmark tests on it. One of the inferences we made was that our engine would require an auto-correction / completion feature since the data coming from a wide user base tends to have typos, abbreviations, etc. We chose Elasticsearch (being a great tool for storing and querying giant text datasets) for creating the search space.
Elasticsearch is a flexible, powerful open-source, distributed real-time search, and analysis engine. It is fast, easy to use, and very useful in many use cases. To implement such a search space, it was necessary to stream data from MySQL databases to ES indices. MySQL dumps can be used to seed the indexes for the first time, but the real challenge was that the data was susceptible to change, so we needed to establish a continuous flow of data between the MySQL databases and the ES indices.
Thus began our hunt for such mechanisms on the internet and we came across some really incredible projects. One of them was the very own ES stack’s logstash, though it seemed promising it couldn’t fill all our requirements. And then we came across this awesome service built with Go.
Go-mysql-elasticsearch is a service syncing your MySQL data into Elasticsearch automatically. It uses mysqldump to fetch the origin data at first, then syncs data incrementally with binlog. So, now let’s get down to business and set this up.
If your environment is not configured with these requirements, follow the above links for installation instructions. Although the developer suggests using MySQL <8.0 and ES <6.0, we implemented it with MySQL 5.7 and ES 7.7 and it worked without any issues.
A river is an easy way to set up a continuous stream of data going to your Elasticsearch data warehouse. It’s a plug-in service that operates within the Elasticsearch cluster that extracts data (or is pushed with data) that is then indexed in the cluster. It is more practical than the traditional method of manual data indexing because once configured, all data will be automatically updated. This reduces complexity and also enables the creation of a real-time system. In this article, I’ll explain how to establish a MySQL river for ElasticSearch.
Installing MySQL river
Run the following command from your terminal to fetch the project
go get github.com/siddontang/go-mysql-elasticsearch
Note: It will show some message, ignore it. Default download location will be $HOME/go, if you want to use a custom location, configure it with GOPATH variable in your environment.
Add the following line to your ~/.bashrc file export GOPATH=/path/to/destination/folder Now run the following command to update the system variables source ~/.bashrc
Now the project will be downloaded and installed into $GOPATH
, which contains a directory src/github.com/siddontang/go-mysql-elasticsearch/
, along with the compiled packages (in pkg/
) for those libraries and their dependencies.
Change to the project directory
cd $HOME/go/src/github.com/siddontang/go-mysql-elasticsearch -- or -- cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch
Let’s build the executable for the program, run the following command in your terminal
make
Configure MySQL
Run the following command from your mysql shell
SET GLOBAL binlog_format = 'ROW'; -- or -- For individual clients use the below command SET SESSION binlog_format = ‘ROW’;
Note: MySQL table which will be synced should have a PK(primary key), multi columns PK is also allowed.
Configure Elasticsearch
Create the associated Elasticsearch index, document type and mappings if possible, if not, Elasticsearch will create these automatically.
Note: If ES index is not configured, the default mappings will be used. Our approach required accurate search results, so we created the index with custom mappings.
Configure River
The mysql-es configurations are provided in the river.toml file under$GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc
Change working directory to
cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc
Open the river.toml config in your preferred editor
nano river.toml
MySQL config
# MySQL address, user and password # user must have replication privilege in MySQL. my_addr = "127.0.0.1:3306" my_user = "root" my_pass = "" my_charset = "utf8"
You can also use cloud hosted mysql services like RDS etc.. Now specify the db of your choice with the flavor param.
# mysql or mariadb flavor = "mysql"
“Go-mysql-elasticsearch uses mysqldump to fetch the origin data at first, then sync data incrementally with binlog” so if you want to index the existing data as well, then specify the dump location with the following config, if not, just comment it or leave empty.
# mysqldump execution path # if not set or empty, ignore mysqldump. mysqldump = "./var/mysqldump"
Note: We had some issues while trying to restore the dump from RDS, so if you are using a RDS instance, check the takeaway section of this post to know more about this issue.
ES config
# Set true when elasticsearch use https #es_https = false # Elasticsearch address es_addr = "127.0.0.1:9200" # Elasticsearch user and password, maybe set by shield, nginx, or x-pack es_user = "" es_pass = ""
Database config
Let’s say we have a database named dbx with tables tba, tbb, tbc … To sync this database and it’s preferred tables, we need to specify them under the source config
# MySQL data source [[source]] schema = "dbx" tables = ["tba", "tbb"]
If you want to sync all the tables in a db, just use the ‘*’ wildcard.
# MySQL data source [[source]] schema = "dbx" tables = ["*"]
Other regex expressions are also supported, you can do something like this to sync tables of similar format
# MySQL data source [[source]] schema = "dbx" tables = ["tb[a-z]"]
If you want to sync more than 1 database, you can replicate this config with their respective db names. So the configuration for syncing dbx and dby will look like this
# MySQL data source [[source]] schema = "dbx" tables = ["tb[a-z]"] [[source]] schema = "dby" tables = ["tbc", "tbd"]
Index config
Say, you want to sync table tba to index tb_index. This is configured under the rule section.
[[rule]] schema = "dbx" table = "tba" index = "tb_index" type = "tba"
Note: Default index and type name will be that of the table name. Change them according to your needs.
If you just want to sync a particular column from the table, use the below config
[[rule]] schema = "dbx" table = "tba" index = "tb_index" type = "tba" filter = ["col_a"] [rule.field] col_a="name"
Here filter represents columns which are to be synced into the index and rule.field represents column and index mapping relation. i.e, in this case, the values from ‘col_a’ will be mapped into ‘name’ field of tb_index.
The rule.field config also supports datatype conversion/specification,
This will map column col_a to elastic search name
[rule.field] col_a="name"
This will map column col_a to elastic search name and use array type
[rule.field] col_a="name,list"
This will map column col_a to elastic search col_a and use array type
[rule.field] col_a=",list"
The “list” modifier will translate a mysql string field like “a, b, c” into an elastic array type ‘{“a”, “b”, “c”}’ this is especially useful if you need to use those fields in filtering in elastic search.
If the created_time field type is “int”, and you want to convert it to “date” type in ES, you can do it as below
[rule.field] created_time=",date"
One more feature that we found really useful was the support for wildcards in table specification. It’s especially helpful when you want to index the chunked parts of a table.
[[rule]] schema = "dbx" table = "tb[a-z]" index = "tb_index" type = "tba" filter = ["col_a"] [rule.field] col_a="name"
Note: Make sure that the tables that match the given wildcard should have same schema
For syncing multiple tables into different indices just replicate the rule config with corresponding table and index name just like we did for multiple dbs.
So, for syncing tba and tbd into 2 indices say tb_index_1 and tb_index_2 the config will be
[[rule]] schema = "dbx" table = "tba" index = "tb_index_1" type = "tba" filter = ["col_a"] [rule.field] col_a="name" [[rule]] schema = "dby" table = "tbd" index = "tb_index_2" type = "tbd" filter = ["col_d"] [rule.field] col_d="name"
With these configurations, we were successfully able to setup a river for syncing mysql data into ES.
Go-mysql-elasticsearch has some more configurations to offer, you can check them out in the project README.md file:
https://github.com/siddontang/go-mysql-elasticsearch#source
All done. Now let’s start our river.
sudo nano /etc/systemd/system/go-mysql-es-river.service
Add the following lines to the service file
[Unit] Description=Instance to serve go mysql es river After=network.target [Service] User=ubuntu Group=www-data WorkingDirectory=$GOPATH/src/github.com/siddontang/go-mysql-elasticsearch ExecStart=$GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/bin/go-mysql-elasticsearch -config=/ $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml [Install] WantedBy=multi-user.target
Save the file (Ctrl+o & hit enter) and exit (Ctrl+x)
Now let’s start our service
sudo systemctl start go-mysql-es-river.service
If you want your river to start at system startup, then create a symlink for the service using
sudo systemctl enable go-mysql-es-river.service
We faced an issue while syncing the mysqldump from RDS. The issue was not related to go-mysql-elasticsearch but rather a permission error related to RDS.
Couldn’t execute ‘FLUSH TABLES WITH READ LOCK’: Access denied for user ‘root’@’%’ (using password: YES) (1045)
This was due to the insufficient privileges to run mysqldump with
–master-data flag in AWS RDS.
If you face this error, either comment the mysqldump config in river.toml and fill the es index using Index API or Bulk API.
If not, you can follow the below steps to populate the index using mysqldump.
my_addr
set to local db and wait until it’s finished.my_addr
to set to RDS MySQL server and mysqldump
to empty.mysqldump
must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only.This service had a great effect on our team, like smoothing the automation of data syncing between our DB instances and search space, making the data transfer happen in real-time, and tracking all necessary CRUD operations. Ultimately, when we tested our application, we found that the changes made in the SQL server were reflected in the corresponding ES indices in real-time.