Browse Source

Merge branch 'feature/update' into develop

* feature/update:
  Remove redundant comments by making it explicit in the Makefile
  Introduce a PlaceUpdated event that updates a projection
  Add website attribute on places table
  Turn address database into a projection.
feature/region-slug-gone
Bèr Kessels 5 months ago
parent
commit
c7c342829c

+ 4
- 1
Makefile View File

@@ -1,6 +1,9 @@
CMD_PREFIX=bundle exec
CONTAINER_NAME=hours_development
TEST_FILES_PATTERN ?= **/*_test.rb
ENV_FILE=.env

include $(ENV_FILE)

# You want latexmk to *always* run, because make does not have all the info.
# Also, include non-file targets in .PHONY so they are run regardless of any
@@ -30,6 +33,7 @@ run:
$(CMD_PREFIX) foreman start

import:
./bin/openaddr2copy < /mnt/sda/OSM/openaddr/nl/countrywide.csv | psql postgresql://$(DB_USER):$(DB_PASSWORD)@$(DB_HOST):$(DB_PORT)/$(DB_NAME) -c "COPY query_addresses FROM STDIN DELIMITER AS ',' CSV HEADER"
osmium tags-filter /mnt/sda/OSM/netherlands-latest.osm.pbf n/amenity=cafe,bar,restaurant,biergarten,fast_food,food_court,ice_cream,pub n/shop n/amenity=bicycle_parking,parking,parking_entrance --output-format osm | osm2geojson | bin/sink

_docker-start:
@@ -39,7 +43,6 @@ _db-setup:
$(CMD_PREFIX) rake db:create
$(CMD_PREFIX) rake db:event_store
$(CMD_PREFIX) rake db:projections
$(CMD_PREFIX) rake db:seed # TODO: don't run seeds on test
@if [ $(APP_ENV) = 'test' ]; then ./bin/seedaddr < test/fixtures/address_sample.csv; fi

_wait:

+ 1
- 0
Rakefile View File

@@ -76,6 +76,7 @@ namespace :db do
desc 'Re Create Projections database and tables'
task projections: :environment do
Hours::Projections::Places::Projector.new.setup
Hours::Projections::Addresses::Projector.new.setup
end
end


+ 10
- 3
app/aggregates/place.rb View File

@@ -1,6 +1,7 @@
# frozen_string_literal: true

require_relative '../events/place_added.rb'
require_relative '../events/place_updated.rb'
require_relative './place/address.rb'
require_relative './place/place_id.rb'
require_relative './place/region.rb'
@@ -31,13 +32,19 @@ module Hours
@aggregate_id = event.aggregate_id
end

def add(payload)
raise DuplicateError, "Place #{id.inspect} already exists" if added?
apply PlaceUpdated do |event|
@aggregate_id = event.aggregate_id
end

def add(payload)
@payload = Hashie.symbolize_keys(payload)
fill_payload

apply_event(PlaceAdded, aggregate_id: id, body: @payload)
if added?
apply_event(PlaceUpdated, aggregate_id: id, body: @payload)
else
apply_event(PlaceAdded, aggregate_id: id, body: @payload)
end
self
end


+ 1
- 1
app/aggregates/place/address.rb View File

@@ -37,7 +37,7 @@ module Hours
end

def dataset
Hours.address_database[ADDRESS_TABLE]
Hours.projections_database[ADDRESS_TABLE]
end

def address

app/events/place_proposed.rb → app/events/place_updated.rb View File

@@ -2,4 +2,4 @@

require 'event_sourcery/event'

PlaceProposed = Class.new(EventSourcery::Event)
PlaceUpdated = Class.new(EventSourcery::Event)

+ 1
- 1
app/models/place.rb View File

@@ -16,7 +16,7 @@ module Hours
PARSER = OpeningHoursConverter::OpeningHoursParser.new

attr_accessor :id, :location, :place_id, :name,
:region_slug, :distance
:region_slug, :distance, :website
attr_writer :address, :opening_hours

def lat

+ 48
- 0
app/projections/addresses.rb View File

@@ -0,0 +1,48 @@
# frozen_string_literal: true

module Hours
module Projections
module Addresses
##
# Handles the Places Projection
class Projector
include EventSourcery::Postgres::Projector
ADDRESS_TABLE_NAME = :query_addresses

projector_name :addresses

# Database tables that form the projection.
table ADDRESS_TABLE_NAME do
column :hash, :varchar, null: false, size: 255, primary_key: true
column :number, :varchar, size: 32
column :street, :varchar, size: 255
column :unit, :varchar, size: 255
column :city, :varchar, size: 255
column :district, :varchar, size: 255
column :region, :varchar, size: 255
column :postcode, :varchar, size: 32
column :location, 'geography(POINT)'

index :hash, unique: true
end

def setup
status = super
@db_connection.run(index_gist)
status
end

private

def index_gist
col = :location
name = "#{ADDRESS_TABLE_NAME}_#{col}"
"CREATE INDEX #{name} ON #{ADDRESS_TABLE_NAME} USING GIST(#{col})"
end

# Directly imported with Postgres COPY CMD for now. So no projection
# handler yet.
end
end
end
end

+ 24
- 2
app/projections/places.rb View File

@@ -1,6 +1,7 @@
# frozen_string_literal: true

require Hours.base_path.join('app', 'events', 'place_added.rb')
require Hours.base_path.join('app', 'events', 'place_updated.rb')

module Hours
module Projections
@@ -18,6 +19,7 @@ module Hours
column :place_id, :varchar, null: false, size: 255
column :region_slug, :varchar, null: false, size: 255
column :name, :text
column :website, :varchar, size: 255
column :opening_hours, :text
column :location, 'geography(POINT)'
column :address, :hstore
@@ -40,6 +42,7 @@ module Hours
location: GeoRuby::SimpleFeatures::Point.from_coordinates(
geometry['coordinates']
),
website: properties['contact:website'],
region_slug: properties['region_slug'],
address: Sequel.hstore(
street: properties['addr:street'],
@@ -52,8 +55,27 @@ module Hours
end
end

def exists?(place_id)
table.where(place_id: place_id).any?
project PlaceUpdated do |event|
geometry = event.body['geometry']
properties = event.body['properties']

table.where(id: event.aggregate_id).update(
place_id: properties['place_id'],
name: properties['name'],
opening_hours: properties['opening_hours'],
location: GeoRuby::SimpleFeatures::Point.from_coordinates(
geometry['coordinates']
),
website: properties['contact:website'],
region_slug: properties['region_slug'],
address: Sequel.hstore(
street: properties['addr:street'],
housenumber: properties['addr:housenumber'],
postcode: properties['addr:postcode'],
city: properties['addr:city'],
addr_country_code: properties['addr:country']
)
)
end
end
end

+ 2
- 1
app/serializers/place.rb View File

@@ -9,7 +9,8 @@ module Hours
class Place < JSONAPI::Serializable::Resource
type 'place'

attributes :name, :lat, :lon, :status, :open_this_week, :place_id
attributes :name, :lat, :lon, :status, :open_this_week, :place_id,
:website
id { @object.id }

attribute :raw_opening_hours do

+ 3
- 16
bin/openaddr2copy View File

@@ -11,23 +11,10 @@ set -o pipefail
# Turn on traces, useful while debugging but commented out by default
# set -o xtrace

# Assuming a table like this:
# CREATE TABLE query_addresses (
# hash char(16) PRIMARY KEY,
# number varchar(32),
# street varchar(255),
# unit varchar(255),
# city varchar(255),
# district varchar(255),
# region varchar(255),
# postcode varchar(32),
# location GEOMETRY(POINT, 4326)
# );
# CREATE INDEX query_address_location ON query_addresses USING GIST(location);
# It can be inserted into postgres using:
# ./bin/openaddr < openaddr/nl/countrywide.csv | psql $DB_URL -c "COPY query_addresses FROM STDIN DELIMITER AS ',' CSV HEADER"

# Print the new header
# Remove duplicates
# Insert the WKT columns
# Take only the columns in the table
csvtk rename -f1-11 -n'lon,lat,number,street,unit,city,district,region,postcode,id,hash' - \
| csvtk uniq -f hash \
| csvtk mutate2 -n location -e '"SRID=4326;POINT(" + $lon + " " + $lat + ")"' \

+ 2
- 2
lib/app.rb View File

@@ -105,10 +105,10 @@ module Hours
body render_json(places_index.to_a, page_links(places_index.paginator))
end

post '/places' do
put '/places' do
command = Hours::AddPlaceCommand.build(json_params)
Hours::CommandHandler.new.handle(command)
status 201
status 202
headers 'Location' => places_url(command.aggregate_id)
end


+ 0
- 6
lib/hours.rb View File

@@ -67,12 +67,6 @@ module Hours
EventSourcery::Postgres.config.projections_database
end

def self.address_database
# For now, we're lazy and stick the addresses in the same projections
# database. It could (and probably should) be a separate database.
EventSourcery::Postgres.config.projections_database
end

def self.repository
@repository ||= EventSourcery::Repository.new(
event_source: event_source,

+ 1
- 0
test/fixtures/output/hm_burchtstraat.json View File

@@ -53,6 +53,7 @@
]
},
"raw_opening_hours": "Mo-We 10:00-18:00; Th 10:00-21:00; Fr 10:00-18:00; Sa 09:30-17:30; Su 12:00-17:30",
"website": null,
"address": {
"postcode": "6511RA",
"city": "Nijmegen",

+ 1
- 0
test/fixtures/output/places.json View File

@@ -50,6 +50,7 @@
},
"name" : "H&M",
"status" : "open",
"website": null,
"lat": 51.8473397,
"lon": 5.8653234,
"address" : {

+ 52
- 18
test/integration/api/add_places_test.rb View File

@@ -8,10 +8,14 @@ describe 'add place' do
let(:lon) { 5.86 }
let(:payload) { json_fixtures('input/hm_broerstraat.json') }

before do
header('Accept', 'application/json')
end

it 'returns success' do
post_json '/places', payload
put_json '/places', payload

assert_status(201)
assert_status(202)
assert_match(%r{http://example.org/places/(.*)},
last_response.headers['Location'])
assert_kind_of(PlaceAdded, last_event(id_from_header))
@@ -21,7 +25,7 @@ describe 'add place' do
it 'adds a place to places projection' do
setup_projectors

post_json '/places', payload
put_json '/places', payload

projector_process_event(id_from_header)

@@ -32,27 +36,57 @@ describe 'add place' do
end

describe 'a place with the same name in a few meter radius' do
let(:payload_nearby) { json_fixtures('input/hm_broerstraat.json') }
let(:payload) { json_fixtures('input/hm_broerstraat.json') }
let(:payload_updated) do
payload.extend(Hashie::Extensions::DeepMerge)
payload.deep_merge(
properties: { 'contact:website': 'https://www.hm.com/nl' }
)
end

before do
post_json '/places', payload
put_json '/places', payload
@aggregate_ids = [id_from_header]
projector_process_event(@aggregate_ids.first)
end

it 'does not add a new place in the query repo' do
it 'updates the old place' do
assert_equal(1, Hours::Projections::PlacesQuery.build.handle.count)

post_json '/places', payload_nearby
assert_status(422)
put_json '/places', payload_updated
assert_status(202)
location = last_response.headers['Location']

assert_match(/already exists/, last_response.body)
matches = last_response.match(/Place "([^"]*)" already/)
refute_nil matches
@aggregate_ids << matches[1]
assert_equal(@aggregate_ids.first, @aggregate_ids.last)
# Follow the location
get location
assert_status(200)
# Before the processor has had a chance to update, we get the old one
expected = {
data: {
attributes: {
name: 'H&M'
}.ignore_extra_keys!
}.ignore_extra_keys!
}

assert_json_match(expected, last_response.body)

# Run the processor to update the data
projector_process_event(@aggregate_ids.last)
# Refresh
get location
assert_status(200)
expected = {
data: {
attributes: {
name: 'H&M',
website: 'https://www.hm.com/nl'
}.ignore_extra_keys!
}.ignore_extra_keys!
}
assert_json_match(expected, last_response.body)

# Assert we did not add a new one.
assert_equal(1, Hours::Projections::PlacesQuery.build.handle.count)
end
end
@@ -66,8 +100,8 @@ describe 'add place' do

it 'adds a region from the geometry point' do
name = payload[:properties][:name]
post_json '/places', payload
assert_status(201)
put_json '/places', payload
assert_status(202)
projector_process_event(id_from_header)

region = Hours::Projections::RegionQuery.build.handle('nijmegen')
@@ -87,8 +121,8 @@ describe 'add place' do
end

it 'adds an address from the geometry point' do
post_json '/places', payload
assert_status(201)
put_json '/places', payload
assert_status(202)
projector_process_event(id_from_header)

get "/places/#{id_from_header}"
@@ -112,7 +146,7 @@ describe 'add place' do
it 'returns bad request for missting missing geometry' do
payload_missing_geom = payload.slice(:type, :id, :properties)
assert_nil payload_missing_geom[:geometry]
post_json '/places', payload_missing_geom
put_json '/places', payload_missing_geom

assert_status(400)
assert_equal last_response.body, 'Bad Request: geometry is blank'

+ 12
- 20
test/integration/cli/sink_test.rb View File

@@ -8,31 +8,23 @@ describe 'sink' do
let(:payload) { json_fixtures('input/hm_broerstraat.json') }

it 'creates an event' do
status_list = Open3.pipeline(
['cat', fixtures('input/hm_broerstraat.json')],
[{ 'LOG_LEVEL' => '1' }, './bin/sink']
)

status_list.each { |status| assert status.success? }

run_sink_pipe
assert_kind_of(PlaceAdded, last_event)
refute_nil(last_event.aggregate_id)
assert_equal(last_event.body['properties']['name'], 'H&M')
end

it 'logs duplicates and continues' do
status_list = Open3.pipeline(
['cat', fixtures('input/hm_broerstraat.json')],
[{ 'LOG_LEVEL' => '1' }, './bin/sink']
)
status_list.each { |status| assert status.success? }

stdout, stderr, status = Open3.capture3(
"cat #{fixtures('input/hm_broerstraat.json')} | LOG_LEVEL=1 ./bin/sink"
)
# Don't stop with an error
assert status.success?, "Expected success, got #{stderr}"
assert_match(/Place .* already exists/, stdout.to_s)
it 'updates duplicates and continues' do
run_sink_pipe
run_sink_pipe
end
end

def run_sink_pipe
status_list = Open3.pipeline(
['cat', fixtures('input/hm_broerstraat.json')],
[{ 'LOG_LEVEL' => '1' }, './bin/sink']
)
status_list.each { |status| assert status.success? }
end
end

+ 2
- 2
test/support/request_helpers.rb View File

@@ -17,9 +17,9 @@ module RequestHelpers
matches[1] if matches
end

def post_json(url, body, headers = {})
def put_json(url, body, headers = {})
defaults = { 'Content-Type' => 'application/json' }
post url, body.to_json, headers.merge(defaults)
put url, body.to_json, headers.merge(defaults)
end

def assert_status(status, message = nil)

Loading…
Cancel
Save