diff --git a/packages/lime-system/files/usr/lib/lua/lime/utils.lua b/packages/lime-system/files/usr/lib/lua/lime/utils.lua index 542490f6b..0b1537f1b 100644 --- a/packages/lime-system/files/usr/lib/lua/lime/utils.lua +++ b/packages/lime-system/files/usr/lib/lua/lime/utils.lua @@ -543,4 +543,21 @@ function utils.is_valid_mac(string) end end +function utils.deepcompare(t1,t2) + if t1 == t2 then return true end + local ty1 = type(t1) + local ty2 = type(t2) + if ty1 ~= ty2 then return false end + if ty1 ~= 'table' and ty2 ~= 'table' then return t1 == t2 end + for k1, v1 in pairs(t1) do + local v2 = t2[k1] + if v2 == nil or not utils.deepcompare(v1, v2) then return false end + end + for k2, v2 in pairs(t2) do + local v1 = t1[k2] + if v1 == nil or not utils.deepcompare(v1, v2) then return false end + end + return true +end + return utils diff --git a/packages/shared-state/files/usr/bin/shared-state-multiwriter b/packages/shared-state/files/usr/bin/shared-state-multiwriter new file mode 100755 index 000000000..45c39ee22 --- /dev/null +++ b/packages/shared-state/files/usr/bin/shared-state-multiwriter @@ -0,0 +1,57 @@ +#!/usr/bin/lua +--! Minimalistic CRDT-like shared state structure suitable for mesh networks +--! which handles conflicts in the same entry using last modified timestamp. +--! +--! Copyright (C) 2019 Gioacchino Mazzurco +--! +--! This program is free software: you can redistribute it and/or modify +--! it under the terms of the GNU Affero General Public License version 3 as +--! published by the Free Software Foundation. +--! +--! This program is distributed in the hope that it will be useful, +--! but WITHOUT ANY WARRANTY; without even the implied warranty of +--! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +--! GNU Affero General Public License for more details. +--! +--! You should have received a copy of the GNU Affero General Public License +--! along with this program. If not, see . + +if type(arg[2]) ~= "string" or arg[2]:len() < 1 then + print (arg[0], "needs CRDT name to be specified as second argument") + os.exit(-22) +end + +local JSON = require("luci.jsonc") +local shared_state = require("shared-state") +require("nixio.util") + +nixio.openlog("shared-state") +local logmask = "info" +if os.getenv("DEBUG") then + logmask = "debug" +end +nixio.setlogmask(logmask) + +local sharedState = shared_state.SharedStateMultiWriter:new(arg[2], nixio.syslog) + +if arg[1] == "insert" then + local inputTable = JSON.parse(io.stdin:read("*all")) or {} + sharedState:insert(inputTable) +elseif arg[1] == "get" then + local resultTable = sharedState:get() + print(JSON.stringify(resultTable)) +elseif arg[1] == "sync" then + local urls = {} + if arg[3] ~= nil then for i=3,#arg do table.insert(urls, arg[i]) end end + sharedState:sync(urls) +elseif arg[1] == "reqsync" then + local inputJson = JSON.parse(io.stdin:read("*all")) + sharedState:merge(inputJson) + print(sharedState:toJsonString()) +else + print(arg[0], "needs an operation name to be specified as first argument") + nixio.closelog() + os.exit(-22) +end + +nixio.closelog() diff --git a/packages/shared-state/files/usr/lib/lua/shared-state.lua b/packages/shared-state/files/usr/lib/lua/shared-state.lua index 6884b582b..309ac3634 100644 --- a/packages/shared-state/files/usr/lib/lua/shared-state.lua +++ b/packages/shared-state/files/usr/lib/lua/shared-state.lua @@ -20,94 +20,31 @@ local fs = require("nixio.fs") local JSON = require("luci.jsonc") local nixio = require("nixio") local uci = require("uci") +local utils = require("lime.utils") local shared_state = {} shared_state.DATA_DIR = '/var/shared-state/data/' +shared_state.PERSISTENT_DATA_DIR = '/etc/shared-state/persistent-data/' shared_state.ERROR_LOCK_FAILED = 165 +shared_state.CANDIDATE_NEIGHBORS_BIN = '/usr/bin/shared-state-get_candidates_neigh' -local SharedState = {} - -function SharedState:new(dataType, logger) - --! Name of the CRDT is mandatory - if type(dataType) ~= "string" or dataType:len() < 1 then - return - end - local logger = (type(logger) == "function") and logger or function() end - local ss = { - dataType = dataType, - log = logger, - --! Map - --! bleachTTL is the count of how much bleaching should occur before the - --! entry expires - --! author is the name of the host who generated that entry - --! data is the value of the entry - storage={}, - --! true if self_storage has changed after loading - changed=false, - -- File descriptor of the persistent file storage - storageFD=nil, - --! true when persistent storage file is locked by this instance - locked=false, - dataFile = shared_state.DATA_DIR..dataType..".json", - hooksDir = "/etc/shared-state/hooks/"..dataType.."/" - } - setmetatable(ss, self) - self.__index = SharedState - return ss -end +local SharedStateBase = {} -function SharedState:_bleach() - local substancialChange = false - for k,v in pairs(self.storage) do - if(v.bleachTTL < 2) then - self.storage[k] = nil - substancialChange = true - else - v.bleachTTL = v.bleachTTL-1 +function SharedStateBase:load(mergeWithCurrentState) + local onDiskData = JSON.parse(self.storageFD:readall()) or {} + if mergeWithCurrentState then + self:_merge(onDiskData) + else + for key, value in pairs(onDiskData) do + self.storage[key] = value end - self.changed = true end - return substancialChange -end - -function SharedState:bleach() - self:lock() - self:load() - local shouldNotify = self:_bleach() - self:save() - self:unlock() - --! Avoid hooks being called if data hasn't substantially changed - if(shouldNotify) then self:notifyHooks() end end -function SharedState:_insert(key, data, bleachTTL) - bleachTTL = bleachTTL or 30 - self.storage[key] = { - bleachTTL=bleachTTL, - author=io.input("/proc/sys/kernel/hostname"):read("*line"), - data=data - } - self.changed = true -end - -function SharedState:insert(data, bleachTTL) - self:lock() - self:load() - for key, lv in pairs(data) do self:_insert(key, lv, bleachTTL) end - self:save() - self:unlock() - self:notifyHooks() -end - -function SharedState:load() - self:_merge(JSON.parse(self.storageFD:readall()), false) -end - -function SharedState:lock(maxwait) +function SharedStateBase:lock(maxwait) if self.locked then return end maxwait = maxwait or 10 - - fs.mkdirr(shared_state.DATA_DIR) + fs.mkdirr(fs.dirname(self.dataFile)) self.storageFD = nixio.open( self.dataFile, nixio.open_flags("rdwr", "creat") ) @@ -126,39 +63,16 @@ function SharedState:lock(maxwait) end end -function SharedState:_merge(stateSlice, notifyInsert) - local stateSlice = stateSlice or {} - if(notifyInsert == nil) then notifyInsert = true end - - for key,rv in pairs(stateSlice) do - if rv.bleachTTL <= 0 then - self.log( "debug", "sharedState:merge got expired entry" ) - self.changed = true - else - local lv = self.storage[key] - if( lv == nil ) then - self.storage[key] = rv - self.changed = self.changed or notifyInsert - elseif ( lv.bleachTTL < rv.bleachTTL ) then - self.log( "debug", "Updating entry for: "..key.." older: ".. - lv.bleachTTL.." newer: "..rv.bleachTTL ) - self.storage[key] = rv - self.changed = self.changed or notifyInsert - end - end - end -end - -function SharedState:merge(stateSlice, notifyInsert) +function SharedStateBase:merge(stateSlice) self:lock() self:load() - self:_merge(stateSlice, notifyInsert) + self:_merge(stateSlice) self:save() self:unlock() self:notifyHooks() end -function SharedState:notifyHooks() +function SharedStateBase:notifyHooks() if self.changed then local jsonString = self:toJsonString() if not fs.dir(self.hooksDir) then return end @@ -170,21 +84,7 @@ function SharedState:notifyHooks() end end -function SharedState:_remove(key) - if(self.storage[key] ~= nil and self.storage[key].data ~= nil) - then self:_insert(key, nil) end -end - -function SharedState:remove(keys) - self:lock() - self:load() - for _,key in ipairs(keys) do self:_remove(key) end - self:save() - self:unlock() - self:notifyHooks() -end - -function SharedState:save() +function SharedStateBase:save() if self.changed then local outFd = io.open(self.dataFile, "w") outFd:write(self:toJsonString()) @@ -193,7 +93,7 @@ function SharedState:save() end end -function SharedState:httpRequest(url, body) +function SharedStateBase:httpRequest(url, body) local tmpfname = os.tmpname() local tmpfd = io.open(tmpfname, "w") @@ -212,7 +112,7 @@ function SharedState:httpRequest(url, body) return value end -function SharedState:_sync(urls) +function SharedStateBase:_sync(urls) urls = urls or {} if #urls < 1 then @@ -225,11 +125,11 @@ function SharedState:_sync(urls) line.."/"..self.dataType ) end - io.input(io.popen(arg[0].."-get_candidates_neigh")) + io.input(io.popen(shared_state.CANDIDATE_NEIGHBORS_BIN)) for line in io.lines() do table.insert( urls, - "http://["..line.."]/cgi-bin/shared-state/"..self.dataType ) + self:getSyncUrl(line, self.dataType)) end end @@ -247,30 +147,30 @@ function SharedState:_sync(urls) end end -function SharedState:sync(urls) +function SharedStateBase:sync(urls) self:lock() self:load() self:unlock() self:_sync(urls) self:lock() - self:load() -- Take in account changes happened during sync + self:load(true) -- Take in account changes happened during sync self:save() self:unlock() self:notifyHooks() end -function SharedState:toJsonString() +function SharedStateBase:toJsonString() return JSON.stringify(self.storage) end -function SharedState:get() +function SharedStateBase:get() self:lock() self:load() self:unlock() return self.storage end -function SharedState:unlock() +function SharedStateBase:unlock() if not self.locked then return end self.storageFD:lock("ulock") self.storageFD:close() @@ -278,5 +178,177 @@ function SharedState:unlock() self.locked = false end +function createSharedStateBase(dataType, logger, dataFile) + local logger = (type(logger) == "function") and logger or function() end + local newInstance = { + dataType = dataType, + log = logger, + --! Map + --! bleachTTL is the count of how much bleaching should occur before the + --! entry expires + --! author is the name of the host who generated that entry + --! data is the value of the entry + storage={}, + --! true if self_storage has changed after loading + changed=false, + -- File descriptor of the persistent file storage + storageFD=nil, + --! true when persistent storage file is locked by this instance + locked=false, + dataFile = dataFile, + hooksDir = "/etc/shared-state/hooks/"..dataType.."/" + } + return newInstance +end + +local SharedState = {} +setmetatable(SharedState, {__index = SharedStateBase}) + +function SharedState:new(dataType, logger) + local dataFile = shared_state.DATA_DIR..dataType..".json" + local newInstance = createSharedStateBase(dataType, logger, dataFile) + setmetatable(newInstance, {__index = SharedState}) + return newInstance +end + +function SharedState:_bleach() + local substancialChange = false + for k,v in pairs(self.storage) do + if(v.bleachTTL < 2) then + self.storage[k] = nil + substancialChange = true + else + v.bleachTTL = v.bleachTTL-1 + end + self.changed = true + end + return substancialChange +end + +function SharedState:bleach() + self:lock() + self:load() + local shouldNotify = self:_bleach() + self:save() + self:unlock() + --! Avoid hooks being called if data hasn't substantially changed + if(shouldNotify) then self:notifyHooks() end +end + +function SharedState:_insert(key, data, bleachTTL) + bleachTTL = bleachTTL or 30 + self.storage[key] = { + bleachTTL=bleachTTL, + author=io.input("/proc/sys/kernel/hostname"):read("*line"), + data=data + } + self.changed = true +end + +function SharedState:insert(data, bleachTTL) + self:lock() + self:load() + for key, lv in pairs(data) do self:_insert(key, lv, bleachTTL) end + self:save() + self:unlock() + self:notifyHooks() +end + +function SharedState:_merge(stateSlice) + local stateSlice = stateSlice or {} + for key,rv in pairs(stateSlice) do + if rv.bleachTTL <= 0 then + self.log( "debug", "sharedState:merge got expired entry" ) + self.changed = true + else + local lv = self.storage[key] + if( lv == nil or lv.bleachTTL < rv.bleachTTL ) then + self.log( "debug", "Updating entry for: "..key.." older: ".. + (lv and lv.bleachTTL or 'no entry').." newer: "..rv.bleachTTL ) + self.storage[key] = rv + self.changed = true + end + end + end +end + +function SharedState:_remove(key) + if(self.storage[key] ~= nil and self.storage[key].data ~= nil) + then self:_insert(key, nil) end +end + +function SharedState:remove(keys) + self:lock() + self:load() + for _,key in ipairs(keys) do self:_remove(key) end + self:save() + self:unlock() + self:notifyHooks() +end + +function SharedState:getSyncUrl(host) + return "http://["..host.."]/cgi-bin/shared-state/"..self.dataType +end + + +local SharedStateMultiWriter = {} +setmetatable(SharedStateMultiWriter, {__index = SharedStateBase}) + +function SharedStateMultiWriter:new(dataType, logger) + local dataFile = shared_state.PERSISTENT_DATA_DIR..dataType..".json" + local newInstance = createSharedStateBase(dataType, logger, dataFile) + setmetatable(newInstance, {__index = SharedStateMultiWriter}) + return newInstance +end + + +function SharedStateMultiWriter:_merge(stateSlice) + --! Make merge based on an incremental counter (changes) and a random number (fortune) + local stateSlice = stateSlice or {} + for key,rv in pairs(stateSlice) do + local lv = self.storage[key] + if ( lv == nil or lv.changes < rv.changes or + ( lv.changes == rv.changes and lv.fortune < rv.fortune )) then + self.log( "debug", "Updating entry for: "..key.." older: ".. + (lv and lv.changes or 'no entry') .." newer: "..rv.changes ) + self.storage[key] = rv + self.changed = true + end + end +end + +function SharedStateMultiWriter:insert(data) + self:lock() + self:load() + for key, lv in pairs(data) do self:_insert(key, lv) end + self:save() + self:unlock() + self:notifyHooks() +end + +function shared_state._getFortune() + return math.random(1, 100000) +end + +function SharedStateMultiWriter:_insert(key, data) + local lv = self.storage[key] + if (lv == nil or not utils.deepcompare(lv.data, data)) then + local changes = lv and lv.changes + 1 or 0 + self.storage[key] = { + lastModified=os.time(), + changes=changes, + fortune=shared_state._getFortune(), + author=io.input("/proc/sys/kernel/hostname"):read("*line"), + data=data + } + self.changed = true + end +end + +function SharedStateMultiWriter:getSyncUrl(host) + return "http://["..host.."]/cgi-bin/shared-state-multiwriter/"..self.dataType +end + shared_state.SharedState = SharedState +shared_state.SharedStateMultiWriter = SharedStateMultiWriter return shared_state diff --git a/packages/shared-state/files/www/cgi-bin/shared-state-multiwriter b/packages/shared-state/files/www/cgi-bin/shared-state-multiwriter new file mode 100755 index 000000000..3fca00f7d --- /dev/null +++ b/packages/shared-state/files/www/cgi-bin/shared-state-multiwriter @@ -0,0 +1,29 @@ +#!/bin/sh + +< + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . + +LICENSE + +echo "Content-Type: application/json" +echo + +[ "$REQUEST_METHOD" != "POST" ] && exit 22 +[ -z "$PATH_INFO" ] && exit 22 +PATH_INFO="${PATH_INFO#/}" +echo "$PATH_INFO" | grep -q '^[a-zA-Z0-9_-]*$' || exit 22 + +cat - | shared-state-multiwriter reqsync "$PATH_INFO" diff --git a/packages/shared-state/tests/test_shared_state.lua b/packages/shared-state/tests/test_shared_state.lua index 8b1e8fbb5..adfe6fd0c 100644 --- a/packages/shared-state/tests/test_shared_state.lua +++ b/packages/shared-state/tests/test_shared_state.lua @@ -116,3 +116,80 @@ describe('LiMe Utils tests #sharedstate', function() end) end) + +describe('Shared State MultiWriter tests #sharedstatemultiwriter', function() + before_each('', function() + test_dir = test_utils.setup_test_dir() + shared_state.PERSISTENT_DATA_DIR = test_dir + end) + + after_each('', function() + test_utils.teardown_test_dir() + end) + + it('test insert new data', function() + local ss = shared_state.SharedStateMultiWriter:new('foo') + local sharedStateB = shared_state.SharedStateMultiWriter:new('B') + stub(shared_state, "_getFortune", function () return 100 end) + ss:insert({ bar = '123'}) + local db = ss:get() + assert.is.equal('123', db.bar.data) + assert.is.equal(0, db.bar.changes) + assert.is.equal(100, db.bar.fortune) + end) + + it('test merge data from remote db with more changes (newer)', function() + local sharedStateA = shared_state.SharedStateMultiWriter:new('A') + local sharedStateB = shared_state.SharedStateMultiWriter:new('B') + sharedStateA:insert({ bar = 'foo', baz = 'qux', zig = 'old_zag'}) + sharedStateB:insert({ zig = 'new_zag'}) + sharedStateB:insert({ zig = 'newer_zag'}) + sharedStateA:merge(sharedStateB:get()) + local dbA = sharedStateA:get() + assert.is.equal('foo', dbA.bar.data) + assert.is.equal('qux', dbA.baz.data) + assert.is.equal('newer_zag', dbA.zig.data) + end) + + it('test merge data from remote db with less changes (older)', function() + local sharedStateA = shared_state.SharedStateMultiWriter:new('A') + local sharedStateB = shared_state.SharedStateMultiWriter:new('B') + sharedStateA:insert({ bar = 'foo', baz = 'qux', zig = 'zag'}) + sharedStateA:insert({ zig = 'new_zag'}) + sharedStateB:insert({ zig = 'old_zag'}) + sharedStateA:merge(sharedStateB:get()) + local dbA = sharedStateA:get() + assert.is.equal('foo', dbA.bar.data) + assert.is.equal('qux', dbA.baz.data) + assert.is.equal('new_zag', dbA.zig.data) + end) + + it('test merge data from remote db with same changes but more luck', function() + local sharedStateA = shared_state.SharedStateMultiWriter:new('A') + local sharedStateB = shared_state.SharedStateMultiWriter:new('B') + stub(shared_state, "_getFortune", function () return 100 end) + sharedStateA:insert({ bar = 'foo', baz = 'qux', zig = 'old_zag'}) + stub(shared_state, "_getFortune", function () return 1000 end) + sharedStateB:insert({ zig = 'new_zag'}) + sharedStateA:merge(sharedStateB:get()) + local dbA = sharedStateA:get() + assert.is.equal('foo', dbA.bar.data) + assert.is.equal('qux', dbA.baz.data) + assert.is.equal('new_zag', dbA.zig.data) + end) + + it('test merge data from remote db with same changes but less luck', function() + local sharedStateA = shared_state.SharedStateMultiWriter:new('A') + local sharedStateB = shared_state.SharedStateMultiWriter:new('B') + stub(shared_state, "_getFortune", function () return 1000 end) + sharedStateA:insert({ bar = 'foo', baz = 'qux', zig = 'new_zag'}) + stub(shared_state, "_getFortune", function () return 100 end) + sharedStateB:insert({ zig = 'old_zag'}) + sharedStateA:merge(sharedStateB:get()) + local dbA = sharedStateA:get() + assert.is.equal('foo', dbA.bar.data) + assert.is.equal('qux', dbA.baz.data) + assert.is.equal('new_zag', dbA.zig.data) + end) + +end)