Browse Source

Add a feature to handle a stream of nodes over the CLI

develop^2
Bèr Kessels 1 month ago
parent
commit
63ee57e8f6
5 changed files with 81 additions and 1 deletions
  1. 2
    0
      Gemfile
  2. 4
    0
      Gemfile.lock
  3. 45
    0
      bin/sink
  4. 24
    0
      test/integration/cli/sink_test.rb
  5. 6
    1
      test/support/event_helpers.rb

+ 2
- 0
Gemfile View File

@@ -19,6 +19,8 @@ gem 'sequel-postgis-georuby'
gem 'sinatra'
gem 'sinatra-contrib'
gem 'sprockets'
gem 'yajl'
gem 'yajl-ruby'

group :development, :test do
gem 'dotenv', '~> 2.6'

+ 4
- 0
Gemfile.lock View File

@@ -172,6 +172,8 @@ GEM
tzinfo (1.2.5)
thread_safe (~> 0.1)
unicode-display_width (1.4.1)
yajl (0.3.4)
yajl-ruby (1.4.1)

PLATFORMS
ruby
@@ -205,6 +207,8 @@ DEPENDENCIES
sinatra-contrib
sprockets
timecop
yajl
yajl-ruby

BUNDLED WITH
1.17.3

+ 45
- 0
bin/sink View File

@@ -0,0 +1,45 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require 'bundler/setup'
require 'logger'
require 'yajl'

require_relative '../lib/hours.rb'
require_relative '../config/event_sourcery.rb'

LOG_LEVEL = ENV['LOG_LEVEL'].to_i || Logger::DEBUG

# Handles a stream of nodes in STDIN emits them as events through the
# AddNodeCommand.
class EventSink
def initialize
@parser = Yajl::Parser.new(symbolize_keys: true)
@parser.on_parse_complete = method(:object_parsed)

@logger = Logger.new(STDOUT)
EventSourcery.logger.level = @logger.level = LOG_LEVEL
end

def object_parsed(obj)
log(Logger::DEBUG, '-- parsed object')
as_params = obj.merge(node_id: SecureRandom.uuid)
command = Hours::AddNodeCommand.build(as_params)
Hours::CommandHandler.new.handle(command)
end

def call(io)
@parser.parse(io)
rescue Yajl::ParseError => e
log(Logger::ERROR, "ERROR: #{e}")
close_connection
end

private

def log(severity, message = nil, progname = 'sink')
@logger.add(severity, message, progname)
end
end

EventSink.new.call(STDIN)

+ 24
- 0
test/integration/cli/sink_test.rb View File

@@ -0,0 +1,24 @@
# frozen_string_literal: true

require 'test_helper'
require 'open3'

describe 'sink' do
describe 'pipe to sink' do
let(:payload) { json_fixtures('db/full.json') }

it 'creates an event' do
status_list = Open3.pipeline(
['cat', fixtures('db/full.json')],
[{ 'LOG_LEVEL' => '1' }, './bin/sink']
)

status_list.each { |status| assert status.success? }

assert_kind_of(NodeAdded, last_event)
refute_nil(last_event.aggregate_id)
assert_equal(last_event.body['properties'],
payload.fetch(:properties).transform_keys(&:to_s))
end
end
end

+ 6
- 1
test/support/event_helpers.rb View File

@@ -3,7 +3,12 @@
##
# Helpers for testing events.
module EventHelpers
def last_event(aggregate_id)
def last_event(aggregate_id = nil)
unless aggregate_id
event_id = Hours.event_store.latest_event_id
aggregate_id = Hours.event_store.get_next_from(event_id).last.aggregate_id
end

Hours.event_store.get_events_for_aggregate_id(aggregate_id).last
end


Loading…
Cancel
Save