The Gnar Company
The Gnar Company

STDIN Stream Processing Using Crystal Concurrency

by Nick Maloney

Recently, I embarked on building out a simple proof of concept where I needed to render the latest JSON object from a stream originating from STDIN. The source data comes from a PR'd version of the system monitoring tool atop that adds an option for JSON output. I wanted the ability to render the most recent JSON object via HTTP as a heartbeat monitor of sorts. As a side note, this was purely an academic exercise, if one simply needs machine metrics as JSON Prometheus Node Exporter will likely do the job.

The tool I'd normally reach for with tasks such as this is Golang, it provides great constructs for concurrency, JSON parsing, and serving HTTP. However, Crystal has been on my radar for some time so I decided to give it a try just to see how it would work for a problem such as this. The last time I had dabbled in Crystal the concurrency model was still largely a work in progress. Since then, the concurrency implementation has improved significantly, as have the language constructs around implementing fibers.

Getting Started

The first step in this process is defining the source data. The version of atop that I'm using produces a stream of JSON data that I'd like to pass into my server as STDIN. If I was to do a future iteration, I'd probably direct the stream into a socket and have the Crystal server read from that vs STDIN. A socket would arguably provide a cleaner, more deliberate data stream.

To generate the stream I run:

atop -O only

This produces a stream of large JSON objects containing all metrics provided by atop.

Processing JSON

The next step is to build out the mechanics for processing a stream of STDIN and converting it to JSON. As a reminder, this is purely a proof-of-concept and should not be used in production as the parsing is not resilient enough towards errors and the concurrency implementation is not tuned. Below is a simplified (mostly functional) example of how easy it is to parse JSON from STDIN using Crystal. Keep in mind, this assumes the stream separates the payloads by line breaks.

STDIN.each_line do |line|
  json = JSON.parse(line)
end

The above parsing does not handle receiving invalid JSON so we'll introduce some basic error handling in the implementation below.

Serving HTTP

Now that we have the parsing figured out, we'll need to setup the HTTP server. The Crystal stdlib is robust and includes an HTTP server. We'll configure it to respond to requests going to the /metrics path:

server = HTTP::Server.new do |context|
  case context.request.path
  when "/metrics"
    context.response.content_type = "text/plain"
    context.response.puts "HELLO METRICS"
    context.response.flush
  end
end

puts "Listening on http://127.0.0.1:8080"
server.listen(8080)

Putting it all together

At this point, we have most of the building blocks in place. The only piece missing is taking input from STDIN and feeding it to the web server. This is where concurrency comes into play. Crystal's use of fibers communicating via channels is similar to Golang's in terms of language semantics. We'll use them to receive STDIN and communicate the parsed JSON over a channel to the web server response.

Putting this all together gives us the following:

require "http/server"
require "json"

module Parser
  def self.parse(str : String)
    begin
      JSON.parse(str)
    end
  end
end

channel = Channel(JSON::Any).new

spawn do
  STDIN.each_line do |line|
    json = Parser.parse(line)
    channel.send(json)
  end
end

server = HTTP::Server.new do |context|
  case context.request.path
  when "/metrics"
    context.response.content_type = "text/plain"
    resp = channel.receive
    context.response.puts resp
    context.response.flush
  end
end

puts "Listening on http://127.0.0.1:8080"
server.listen(8080)

To run the server with output from atop we'd use the following command:

atop -O only | crystal server.cr

Visiting http://localhost:8080/metrics would give us the most recent JSON payload parsed from STDIN at the time of the request.

Summary

The Crystal language continues to impress me. It has a developer friendly syntax that the Rubyist in me appreciates while also having type safety, and approachable concurrency model similar to what Golang provides. I wasn't expecting this exercise to be as simple and terse as it was and I think that speaks to Crystal maturing as a language. I look forward to seeing where it goes in the coming year and the opportunity to potentially use it on some client projects.