Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Dictionaries = "85a47980-9c8c-11e8-2b9f-f7ca1fa99fb4"
DiffEqCallbacks = "459566f4-90b8-5000-8ac3-15dfb0a30def"
Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
Legolas = "741b9549-f6ed-4911-9fbf-4a1c0c97f0cd"
ModelingToolkit = "961ee093-0014-501f-94e3-6117800e7a78"
ModelingToolkitStandardLibrary = "16a59e39-deab-5bd0-87e4-056b12336739"
ModflowInterface = "a348c28d-122a-4d3f-b833-160dba4591a7"
Expand All @@ -21,6 +22,7 @@ PlyIO = "42171d58-473b-503a-8d5f-782019eb09ec"
SciMLBase = "0bca4576-84f4-4d90-8ffe-ffa030f20462"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"

[compat]
Arrow = "2.3"
Expand All @@ -30,6 +32,7 @@ DataInterpolations = "3.7"
Dictionaries = "0.3"
DiffEqCallbacks = "2.23"
Graphs = "1.6"
Legolas = "0.5"
ModelingToolkit = "8.33"
ModelingToolkitStandardLibrary = "1.5"
ModflowInterface = "1"
Expand All @@ -38,6 +41,7 @@ OrdinaryDiffEq = "6.7"
PlyIO = "1.1.2"
SciMLBase = "1.60"
TOML = "1"
Tables = "1"
julia = "1.8"

[extras]
Expand Down
2 changes: 1 addition & 1 deletion docs/index.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ the given timestamp is reached.

column | type | restriction
-------- | -------- | -----------
id | Int | -
id | Int | sorted
variable | String | -
value | Float64 | depending on variable

Expand Down
2 changes: 2 additions & 0 deletions src/Ribasim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ using Dictionaries
using Graphs
using DataInterpolations: LinearInterpolation
using DiffEqCallbacks
using Legolas: Legolas, @schema, @version, validate
using ModelingToolkit
using ModelingToolkit: getname, renamespace
using ModelingToolkitStandardLibrary.Blocks
Expand All @@ -23,6 +24,7 @@ using Serialization: serialize, deserialize

export interpolator, Register, ForwardFill

include("validation.jl")
Comment thread
evetion marked this conversation as resolved.
include("utils.jl")
include("modflow.jl")
include("lib.jl")
Expand Down
18 changes: 13 additions & 5 deletions src/bmi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,19 @@ end
# Read into memory for now with read, to avoid locking the file, since it mmaps otherwise.
# We could pass Mmap.mmap(path) ourselves and make sure it gets closed, since Arrow.Table
# does not have an io handle to close.
read_table(entry::AbstractString) = Arrow.Table(read(entry))

function read_table(entry)
@assert Tables.istable(entry)
return entry
_read_table(entry::AbstractString) = Arrow.Table(read(entry))
_read_table(entry) = entry

function read_table(entry; schema = nothing)
table = _read_table(entry)
@assert Tables.istable(table)
if !isnothing(schema)
sv = schema()
validate(Tables.schema(table), sv)
R = Legolas.record_type(sv)
foreach(R, Tables.rows(table)) # construct each row
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know roughly how long this takes for the national schematization? Especially the forcing can contain millions of rows, and sometimes only a fraction of them is used for short calculations.

Also, the result of this loop is not kept, that means that we only get errors, but any changes like clamp in the Legolas tour is never done, or not?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, at the moment I only check the types and for empty strings. And I rather throw on error than clamp some values, but that's up for debate.

Copy link
Copy Markdown
Member

@visr visr Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yeah right now we don't use something like clamp, and I can't directly think of a use case where it would be better than an error, so let's keep it like this.

Performance was answered here: #43 (comment)

end
return DataFrame(table)
end

"Create an extra column in the forcing which is 0 or the index into the system parameters"
Expand Down
24 changes: 15 additions & 9 deletions src/construction.jl
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
"Load all Arrow input data to SubDataFrames that are filtered for used IDs"
function load_data(config::Dict, starttime::DateTime, endtime::DateTime)
node = DataFrame(read_table(config["node"]))
edge = DataFrame(read_table(config["edge"]))
state = DataFrame(read_table(config["state"]))
static = DataFrame(read_table(config["static"]))
profile = DataFrame(read_table(config["profile"]))
forcing = DataFrame(read_table(config["forcing"]))
# Load data and validate schema + rows for required field types and values
node = read_table(config["node"]; schema = NodeV1SchemaVersion)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to pass the type, and not its instance, NodeV1SchemaVersion()? The latter feels a bit more natural. And the schema keyword can then be set to ::Union{SchemaVersion, Nothing}.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Legolas code requires the type, I passed the type. I can make it an instance and use typeof?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok if Legolas works on the type we can keep it as is, but https://github.com/Deltares/Ribasim.jl/pull/43/files#diff-cd7159638f60b3d2b30ed4bbe7b622ffcdc834f0a2beafe12f46e219a4aa932aR69 (sv = schema()) suggests it doesn't.

edge = read_table(config["edge"]; schema = EdgeV1SchemaVersion)
state = read_table(config["state"]; schema = StateV1SchemaVersion)
static = read_table(config["static"]; schema = StaticV1SchemaVersion)
profile = read_table(config["profile"]; schema = ProfileV1SchemaVersion)
forcing = read_table(config["forcing"]; schema = ForcingV1SchemaVersion)

# Validate consistency in the data
@assert is_consistent(node, edge, state, static, profile, forcing) "Data is not consistent"
Comment thread
visr marked this conversation as resolved.

if haskey(config, "ids")
ids = config["ids"]::Vector{Int}
else
# use all ids in the node table if it is not given in the TOML file
ids = Vector{Int}(node.id)
end
@debug "Using $(length(ids)) nodes"

# keep only IDs we use
node = filter(:id => in(ids), node; view = true)
Expand All @@ -29,15 +34,16 @@ function load_data(config::Dict, starttime::DateTime, endtime::DateTime)
profile = filter(:id => in(ids), profile; view = true)

# for forcing first get the right time range out
@assert issorted(forcing.time)
startrow = searchsortedfirst(forcing.time, starttime)
endrow = searchsortedlast(forcing.time, endtime)
forcing = @view forcing[startrow:endrow, :]
# then keep only IDs we use
forcing = filter(:id => in(ids), forcing; view = true)
Comment on lines +37 to 42
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch that the assert must be moved up because of the searchsorted first.

In general in this function before I only validated the subset that we are using, and now we are validating the whole dataset. If it is cheap enough that is fine, but good to check the cost.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slowest is indeed validation the forcing on 3M rows, which takes 1.7s.

[ Info: Validating Legolas.SchemaVersion{Symbol("ribasim.node"), 1} for 29616 rows.
  0.019118 seconds (206.28 k allocations: 4.905 MiB)
┌ Warning: unsupported ARROW:extension:name type: "geoarrow.linestring", arrow type = Vector{Tuple{Float64, Float64}}
└ @ Arrow ~/.julia/packages/Arrow/QsQ3U/src/eltypes.jl:53
[ Info: Validating Legolas.SchemaVersion{Symbol("ribasim.edge"), 1} for 39043 rows.
  0.024101 seconds (426.03 k allocations: 10.671 MiB)
[ Info: Validating Legolas.SchemaVersion{Symbol("ribasim.state"), 1} for 8531 rows.
  0.028598 seconds (75.18 k allocations: 1.541 MiB, 76.58% gc time)
[ Info: Validating Legolas.SchemaVersion{Symbol("ribasim.static"), 1} for 5464 rows.
  0.010356 seconds (52.27 k allocations: 1.375 MiB)
[ Info: Validating Legolas.SchemaVersion{Symbol("ribasim.profile"), 1} for 46560 rows.
  0.029184 seconds (601.92 k allocations: 13.447 MiB)
[ Info: Validating Legolas.SchemaVersion{Symbol("ribasim.forcing"), 1} for 3088222 rows.
  1.741670 seconds (33.91 M allocations: 837.849 MiB, 4.59% gc time)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the numbers, I'd say 1.7s is no reason to worry for now, we don't have to optimize further. But perhaps later on if needed we will validate only what we use.


# TODO Is order required for ids?
@assert issorted(profile.id)
@assert issorted(static.id)
Comment thread
evetion marked this conversation as resolved.
@assert issorted(forcing.time)

return (; ids, edge, node, state, static, profile, forcing)
end
Expand Down Expand Up @@ -102,7 +108,7 @@ function create_nodes(node, state, profile, static)::Dictionary{Int, ODESystem}
emptysys = ODESystem(Equation[], t, [], []; name = :empty)
sysdict = Dictionary{Int, ODESystem}(node.id, fill(emptysys, nrow(node)))
# create all node systems
for node in eachrow(node)
for node in Tables.rows(node)
Comment thread
visr marked this conversation as resolved.
sys = node_system(node, state, profile, static)
sysdict[node.id] = sys
end
Expand All @@ -112,7 +118,7 @@ end
"Add connections along edges."
function connect_systems(edge, sysdict)::Vector{Equation}
eqs = Equation[]
for edge in eachrow(edge)
for edge in Tables.rows(edge)
from = getproperty(sysdict[edge.from_id], Symbol(edge.from_connector))
to = getproperty(sysdict[edge.to_id], Symbol(edge.to_connector))
eq = connect(from, to)
Expand Down
98 changes: 98 additions & 0 deletions src/validation.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
@schema "ribasim.node" Node
@schema "ribasim.edge" Edge
@schema "ribasim.state" State
@schema "ribasim.static" Static
@schema "ribasim.profile" Profile
@schema "ribasim.forcing" Forcing

# TODO Ideally, these are structs in a Module
# which we can check, and even derive @named connectors
# from. Since it's unclear how plan B would look like,
# we just hardcode them for now.
const nodetypes = (
"LSW",
"GeneralUser_P",
"LevelControl",
"GeneralUser",
"OutflowTable",
"HeadBoundary",
"Bifurcation",
"LevelLink",
"NoFlowBoundary",
)

# TODO These should be coupled to the nodetypes
const from_connectors = ("b", "dst", "dst_1", "dst_2", "s", "x")
const to_connectors = ("a", "s", "s_a", "src", "x")

@version NodeV1 begin
id::Int
node::String = in(node, nodetypes) ? node : error("Unknown node type $node")
end

@version EdgeV1 begin
from_id::Int
from_connector::String =
in(from_connector, from_connectors) ? from_connector :
error("Unknown from_connector type $from_connector")
to_id::Int
to_connector::String =
in(to_connector, to_connectors) ? to_connector :
error("Unknown to_connector type $to_connector")
end

@version StateV1 begin
id::Int
S::Float64
C::Float64
end

@version StaticV1 begin
id::Int
variable::String = isempty(variable) ? error("Empty variable") : variable
value::Float64
end

@version ProfileV1 begin
id::Int
volume::Float64
area::Float64
discharge::Float64
level::Float64
end

@version ForcingV1 begin
id::Int
time::DateTime
variable::String = isempty(variable) ? error("Empty variable") : variable
value::Float64
end

function is_consistent(node, edge, state, static, profile, forcing)

# Check that node ids exist
# TODO Do we need to check the reverse as well? All ids in use?
Comment thread
evetion marked this conversation as resolved.
ids = node.id
@assert edge.from_id ⊆ ids "Edge from_id not in node ids"
@assert edge.to_id ⊆ ids "Edge to_id not in node ids"
@assert state.id ⊆ ids "State id not in node ids"
@assert static.id ⊆ ids "Static id not in node ids"
@assert profile.id ⊆ ids "Profile id not in node ids"
@assert forcing.id ⊆ ids "Forcing id not in node ids"

# Check edges for uniqueness
for sub in groupby(edge, [:from_id, :to_id])
@assert allunique(sub.from_connector) "Duplicate from_connector in edge $(first(sub.from_id))-$(first(sub.to_id))"
@assert allunique(sub.to_connector) "Duplicate from_connector in edge $(first(sub.from_id))-$(first(sub.to_id))"
end

# TODO Check states

# TODO Check statics

# TODO Check profiles

# TODO Check forcings

true
end
11 changes: 7 additions & 4 deletions test/equations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ datadir = normpath(@__DIR__, "data")
to_node = ["OutflowTable", "OutflowTable", "LSW"],
to_connector = ["a", "s", "x"],
)
config["static"] = DataFrame(; id = [], variable = [], value = [])
config["static"] = DataFrame(; id = Int64[], variable = String[], value = Float64[])
config["forcing"] =
DataFrame(; time = DateTime[], variable = Symbol[], id = Int[], value = Float64[])
DataFrame(; time = DateTime[], variable = String[], id = Int[], value = Float64[])
config["profile"] = DataFrame(;
id = [id_lsw, id_lsw, id_out, id_out, id_lsw_end, id_lsw_end],
volume = [0.0, 1e6, 0.0, 1e6, 0.0, 1e6],
Expand Down Expand Up @@ -90,8 +90,11 @@ end
to_node = ["GeneralUser", "GeneralUser", "OutflowTable", "OutflowTable", "LSW"],
to_connector = ["x", "s", "a", "s", "x"],
)
config["static"] = DataFrame(; id = [], variable = [], value = [])
config["forcing"] = normpath(datadir, "lhm/forcing.arrow")
config["static"] = DataFrame(; id = Int64[], variable = String[], value = Float64[])
config["forcing"] = subset(
Ribasim.read_table(normpath(datadir, "lhm/forcing.arrow")),
:id => id -> in.(id, Ref(ids)),
)
config["profile"] = DataFrame(;
id = [14784, 14784, 14908, 14908, 14910, 14910],
volume = [0.0, 1e8, 0.0, 1e8, 0.0, 1e8],
Expand Down