Using Logstash to read from Kafka and write to AWS ES(TLS/Non-TLS)
--
Goal : we have a AWS MSK(Managed streaming Kafka) cluster and we want to use Logstash to collect data from MSK and write it to ES.
Steps:
- Create an AWS MSK cluster. You can use link below to create through console or CLI :https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html
- Once you have the cluster ready, create a topic and push in some data using console-producer following below doc https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html
- Once you have the topic and data. Create an AWS ES cluster in the same VPC as MSK : https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-createupdatedomains.html
- Once Elastic search domain is up. Launch an Ec2 instance in the same VPC as MSK and ES.
- Install java-8
- Then we need to install logstash on it. You can follow below link for the same : https://aws.amazon.com/elasticsearch-service/resources/articles/logstash-tutorial/
- Once logstash is installed, install aws-es plugin : bin/logstash-plugin install logstash-output-amazon_es
- Create a config file in /usr/share/logstash/bin/logstash/logstash.config
Now If we do not have TLS based communication, you can have below in your config file :
input {
kafka {
bootstrap_servers => “<your-bootstrap-servers>”
topics => “<your-topic-name>”
}
}
output {
stdout {}
amazon_es {
hosts => [“your-vpc-endpoint-without-https://”]
region => “<your-region>”
aws_access_key_id => “<your-access-key>”
aws_secret_access_key => “your-secret-key”
index => “kafka-data-%{+YYYY.MM.dd}”
}
}
Now if you have TLS enabled MSK cluster and your communication is TLS based. Make sure that you have proper keystore,truststore,keys installed. You can use below script to automatically install those certificates :
This script would automatically creates file such as kafka.client.keystore.jks, kafka.client.truststore.jks
once you have these files, make changes in your Input of logstash as :
input {
kafka {
bootstrap_servers => “<your-bootstrap-servers>”
security_protocol => “SSL”
ssl_truststore_location => “/home/ec2-user/kafka.client.truststore.jks” <= You may need to change it as per your location
ssl_truststore_password => “changeit”
ssl_keystore_location => “/home/ec2-user/kafka.client.keystore.jks” ≤= You may need to change it as per your location
ssl_keystore_password => “changeit”
ssl_key_password => “changeit”
topics => “<your-topic>”
}
}
Once you have Input/output as expected, You can run the logstash :
sudo /usr/share/logstash/bin/logstash -f /usr/share/logstash/config/logstash.config
Note: Since we do not have “from-beginning” property in kafka, you may need to push fresh data in kafka topic to see if the data is going into ES.
You can use
curl -XGET <your-vpc-endpoint>/_cat/indices
The above will show the new indices being created by kafka.