Using Logstash to read from Kafka and write to AWS ES(TLS/Non-TLS)

Goal : we have a AWS MSK(Managed streaming Kafka) cluster and we want to use Logstash to collect data from MSK and write it to ES.

Steps:

  1. Create an AWS MSK cluster. You can use link below to create through console or CLI :https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html
  2. Once you have the cluster ready, create a topic and push in some data using console-producer following below doc https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html
  3. Once you have the topic and data. Create an AWS ES cluster in the same VPC as MSK : https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-createupdatedomains.html
  4. Once Elastic search domain is up. Launch an Ec2 instance in the same VPC as MSK and ES.
  5. Install java-8
  6. Then we need to install logstash on it. You can follow below link for the same : https://aws.amazon.com/elasticsearch-service/resources/articles/logstash-tutorial/
  7. Once logstash is installed, install aws-es plugin : bin/logstash-plugin install logstash-output-amazon_es
  8. Create a config file in /usr/share/logstash/bin/logstash/logstash.config

Now If we do not have TLS based communication, you can have below in your config file :

input {

kafka {

bootstrap_servers => “<your-bootstrap-servers>”

topics => “<your-topic-name>”

}

}

output {

stdout {}

amazon_es {

hosts => [“your-vpc-endpoint-without-https://”]

region => “<your-region>”

aws_access_key_id => “<your-access-key>”

aws_secret_access_key => “your-secret-key”

index => “kafka-data-%{+YYYY.MM.dd}”

}

}

Now if you have TLS enabled MSK cluster and your communication is TLS based. Make sure that you have proper keystore,truststore,keys installed. You can use below script to automatically install those certificates :

This script would automatically creates file such as kafka.client.keystore.jks, kafka.client.truststore.jks

once you have these files, make changes in your Input of logstash as :

input {

kafka {

bootstrap_servers => “<your-bootstrap-servers>”

security_protocol => “SSL”

ssl_truststore_location => “/home/ec2-user/kafka.client.truststore.jks” <= You may need to change it as per your location

ssl_truststore_password => “changeit”

ssl_keystore_location => “/home/ec2-user/kafka.client.keystore.jks” ≤= You may need to change it as per your location

ssl_keystore_password => “changeit”

ssl_key_password => “changeit”

topics => “<your-topic>”

}

}

Once you have Input/output as expected, You can run the logstash :

sudo /usr/share/logstash/bin/logstash -f /usr/share/logstash/config/logstash.config

Note: Since we do not have “from-beginning” property in kafka, you may need to push fresh data in kafka topic to see if the data is going into ES.

You can use

curl -XGET <your-vpc-endpoint>/_cat/indices

The above will show the new indices being created by kafka.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store