Hello, I came across Snejk's thread that considered threading in lua. Today I wanted to share some other approach to this topic.
It has got one simple advantage. Inside of this function you can make a pause of execution. It uses addEvent for that purpose, and its quite efficient.
Here is some reallysimple showoff.
If someone wants to checkout sn3ejk's approach : link
- - - Updated - - -
Sample Script :
@Edit
I have rewriten this and added signals waiting. I wrote a sample script bomb with detonators to shed some lights on this.
Setting up bomb script :
Detonator script:
Producent - Consumer problem.
I recalled this problem from my class, and I was wondering how my alpha version of this lib would handle it. It was unable to handle it all back then and that inspired me to create an update.
We have 2 threads : one producing values and second one doing something with those values that have been produced. We dont want to stop any of it but only switch between their execution. That is what code below provides.
Change Log:
*added possibility to send signals between threads,
* better handling of threads in general.
* added singleton class to handle all running threads.
* Producer-consumer problem implemented as an example
[/code]
Lua:
TaskHandler = {}
function TaskHandler:new(callback, ...)
local obj = {}
self.__index = self
setmetatable(obj, self)
obj.idle_loops = 0
obj.waiting_signals = {}
obj.pending_signals = {}
obj.parameters = {}
obj.__debug = nil
obj.setEnvironment(obj, callback)
obj.coroutine = coroutine.create(callback, arg ~= nil and unpack(arg))
return obj
end
function TaskHandler:debug(...)
if self.__debug ~= nil then
return print("[Thread : "..tostring(self.name), unpack(arg))
end
end
function TaskHandler:addWaitSignal(name)
self.waiting_signals[name] = true
end
function TaskHandler:releaseSignal(name)
self.waiting_signals[name] = nil
end
function TaskHandler:setName(name)
self.name = name
end
function TaskHandler:recovery(preventsignal)
g_threads:setId(self)
g_threads.threads[self:getId()] = self
if preventsignal == nil then
g_threads.signalhandler:sendSignal("signal_awake")
end
end
function TaskHandler:suspend()
table.insert(g_threads.releasedIds, self:getId())
g_threads.threads[self:getId()] = nil
end
function TaskHandler:wait(delay)
self:debug("---WAITING : "..tostring(delay))
--temporairly remove from main coroutine execution loop
self:suspend()
coroutine.yield("wait", delay)
end
function TaskHandler:wait_signal(...)
--Counts all waitin threads, to freeze signal thread if all threads are waiting
self:debug("---SIGNAL AWAITING HERE --- ")
for k, v in pairs(arg) do
if type(v) == "string" and tonumber(v) == nil then
self.waiting_signals[v] = v
end
end
--Maybe we received a signal before we demanded it?
local lastArg = self:handlePendingSignal()
self:debug("sig wait: "..table.maxv(self.waiting_signals))
self:debug("sig pending: "..table.maxv(self.pending_signals))
if table.maxv(self.waiting_signals) > 0 then
self:debug("---SIGNAL AWAITING OUT ---yielded ")
return coroutine.yield("wait_signal", arg)
else
return lastArg and unpack(lastArg) or nil
end
end
function TaskHandler:sendSignal(signal,...)
self:debug("-----Sending signal--------"..tostring(signal))
self:debug("--- status: "..tostring(self.waiting_signals[signal]))
--self:debug("---amount of waiting signls: "..table.maxv(self.waiting_signals))
self:debug("-------------------------")
if type(signal) == "string" then
self:debug("Awaiting for signals :"..tostring(table.maxv(self.waiting_signals) - 1))
if coroutine.running() ~= nil then
self:addPendingSignal(signal, arg)
else
self:releaseSignal(signal)
self:continue(unpack(arg))
end
end
end
function TaskHandler:addPendingSignal(signal, arg)
self:debug("-----Arguments of signal", unpack(arg))
table.insert(self.pending_signals , {sig = signal, arg = arg})
end
function TaskHandler:handlePendingSignal()
--Signal is not waiting for any signal to beeing proceeded, ignore it
if table.maxv(self.waiting_signals) == 0 then
return nil
end
local size = table.maxv(self.pending_signals)
--self:debug(tostring(self.name).."//Handling pending signals ")
self:debug("Handling pending signals "..size)
--self:debug("Waiting signals "..table.maxv(self.waiting_signals))
local signal_p = self.pending_signals[1]
if signal_p == nil then
return nil
end
self:releaseSignal(signal_p.sig)
self:debug("sig pending: "..table.maxv(self.pending_signals))
table.remove(self.pending_signals,1)
self:debug("sig pending after removal: "..table.maxv(self.pending_signals))
return signal_p.arg
end
function TaskHandler:continue(...)
if coroutine.status(self.coroutine) ~= 'dead' and table.maxv(self.waiting_signals) == 0 then
local succes, sig_type, signals = coroutine.resume(self.coroutine, unpack(arg))
if not succes then
if not succes then
print("COROUTINE ERROR")
print(debug.traceback(self.coroutine, sig_type))
end
end
if sig_type == "wait_signal" and signals ~= nil then
for k, v in pairs(signals) do
self:debug("[:continue] parameters of yield .. "..k..":"..tostring(signal))
end
return
end
if sig_type == "wait" and tonumber(signals) ~= nil then
--Lets schedule restoring this thread to threads main coroutine
addEvent(function()
self:recovery()
end, tonumber(signals))
return
end
if sig_type == "suspend" then
--self:continue()
return
end
if sig_type == "switch_thread" then
signals.thread:continue(unpack(signals.arg))
return
end
if sig_type == "escape" then
self:debug("Escapinf from thread.. executing callback()")
local callback = signals.callback
self:continue(callback(unpack(signals.arg)))
end
self.idle_loops = self.idle_loops + 1
end
if coroutine.status(self.coroutine) == 'dead' then
self:kill()
end
return self.idle_loops
end
function TaskHandler:kill()
if self.onEnd ~= nil and type(self.onEnd) == "function" then
self.onEnd()
end
if self.playerExecuter ~= nil then
g_threads.threads[self.playerExecuter][self.name] = nil
end
if self.id then
g_threads.threads[self.id] = nil
end
end
function TaskHandler:setEnvironment(func)
local sandbox = {}
sandbox.print = print
sandbox.suspend = function()
self:suspend()
end
setmetatable(sandbox, {__index = _G})
sandbox.wait_signal = function(...)
self:debug("Awaiting thread signal, name:"..tostring(table.maxv(arg)))
return self:wait_signal(unpack(arg))
end
sandbox.wait = function(delay)
self:wait(delay)
end
sandbox.escape = function(callback, ...)
coroutine.yield("escape", {callback = callback, arg = arg})
end
setfenv(func, sandbox)
end
function TaskHandler:setId(id)
self:debug("[TaskHandler:setId]")
self.id = id
end
function TaskHandler:getId()
return self.id
end
------------------------
------------------------
------------------------
ThreadsHandler = {}
function ThreadsHandler:new()
local obj = {}
obj.threads = {}
obj.waiting_threads = 0
obj.releasedIds = {}
self.__index = self
setmetatable(obj,self)
return obj
end
function ThreadsHandler:run()
self:signalHandler()
end
function ThreadsHandler:setId(thread)
local id = nil
if table.maxv(self.releasedIds) > 0 then
id = self.releasedIds[1]
table.remove(self.releasedIds,1)
thread:setId(id)
return true
end
thread:setId(table.maxv(self.threads) + 1)
end
function ThreadsHandler:getThread(cid, name)
if self.threads[getPlayerName(cid)] == nil then
return nil
end
return self.threads[getPlayerName(cid)][name]
end
function ThreadsHandler:create(callback, ...)
local thread = TaskHandler:new(callback, unpack(arg))
--We put parameters of first signal to queue
table.insert(thread.parameters, arg)
thread.__debug = nil
thread:recovery(true)
return thread
end
function ThreadsHandler:countWaitingThreads()
for k, v in pairs(self.threads) do
local wait_sigs = v.waiting_signals_count
end
end
function ThreadsHandler:signalHandler()
--Prevent infinite execution
local suspentionLimit = 10
local func = function()
print("SIGNAL HANDLER")
local iterator = 0
local pending = 0
while true do
iterator = iterator + 1
local size = table.maxv(self.threads)
--print("Status -- waiting threads : ".. g_threads:getWaitingThreads() .. " All threads: "..size)
if g_threads:getWaitingThreads() == size then
-- this condition provides one extra loop over threads, to make sure all pending signals were handler
-- Warning! if you will send signals that are unhandler, it might couse infinite loops.
if pending == 0 then
wait_signal("signal_awake")
else
pending = 0
end
end
--update size variable
size = table.maxv(self.threads)
for k, thread in ipairs(self.threads) do
--Thread might have receive signals before it was even started, so lets check a pending box
escape(function()
--Threads should be handled "outside" of main coroutine, thats why I used escape.
if table.maxv(thread.pending_signals) > 0 then
pending = pending + 1
end
thread:debug("Thread execute begin")
thread:debug("Suspention: 1)"..tostring(thread.idle_loops).."2)"..tostring(suspentionLimit))
local lastArg = thread:handlePendingSignal()
table.remove(thread.parameters,1)
local suspend = thread:continue(lastArg ~= nil and unpack(lastArg))
thread:debug("Thread execute ended")
if thread.idle_loops >= suspentionLimit then
--Temporary remove from scheduler.
thread:suspend()
--reset idle loops counter
thread.idle_loops = 0
end
--TaskHandler:handlePendingSignals()
end )
end
if g_threads:getWaitingThreads() < size and size > 0 or pending > 0 then
else
wait_signal("signal_awake")
end
end
end
self.signalhandler = TaskHandler:new(func)
self.signalhandler:continue()
end
function ThreadsHandler:addThread(name, thread)
end
function ThreadsHandler:addPlayerThread(cid, name, thread)
if self:getThread(cid, name) == nil then
if self.threads[getPlayerName(cid)] == nil then
self.threads[getPlayerName(cid)] = {}
end
thread.name = name
thread.playerExecuter = getPlayerName(cid)
self.threads[getPlayerName(cid)][name] = thread
thread:continue(unpack(thread.parameters))
return true
end
return false
end
function ThreadsHandler:getWaitingThreads()
local ret = 0
for k, v in pairs(self.threads) do
if table.maxv(v.waiting_signals) > 0 then
ret = ret + 1
end
end
return ret
end
function ThreadsHandler:killThread(cid, name)
self.threads[getPlayerName(cid)][name] = nil
self:debug("Killed a thread")
self.waiting_threads = self.waiting_threads - 1
end
g_threads = ThreadsHandler:new()
g_threads:run()
It has got one simple advantage. Inside of this function you can make a pause of execution. It uses addEvent for that purpose, and its quite efficient.
Here is some reallysimple showoff.
Lua:
function testEntryPoint(cid, ...)
doCreatureSay(cid,"Delaying message",1)
wait(1000) --delays for a second
doCreatureSay(cid,"Woot, it gots delayed :D",1)
for k, v in pairs(arg) do
doCreatureSay(cid, v ,1) --speaks out every addional argument you pass to the function
wait(3000) -- delays 3 seconds
end
end
local core = TaskHandler:new(testEntryPoint, cid, "text1","text2")
If someone wants to checkout sn3ejk's approach : link
- - - Updated - - -
Sample Script :
Lua:
local roatations = {
[0] = {[1] = {-1, 0},
[-1] = {1, 0}},
[1] = {[0] = {0, 1}},
[-1] = {[0] = {0,-1}}
}
function lights(cid)
local size = 4
local currentAngle = 0
local x = 0
local y = 0
local dx = 1
local dy = 0
local efects = 0
local startPos = getCreaturePosition(cid)
while true do
if efects > math.random(5,10)*size then break end
if math.abs(x + dx) > size or math.abs(y + dy) > size then
local rot = roatations[dx][dy]
dx = rot[1]
dy = rot[2]
doCreatureSay(cid,"rotating",1)
else
x = x + dx
y = y + dy
efects = efects + 1
doSendMagicEffect({x=startPos.x + x, y = startPos.y + y, z = startPos.z}, 18)
wait(100)
end
end
doCreateTeleport(1387, getCreaturePosition(cid), {x=startPos.x + x, y = startPos.y + y, z = startPos.z})
end
function onUse(cid, var, item, fromPosition, toPosition, itemEx, combat)
local core = TaskHandler:new(lights, cid)
end
@Edit
I have rewriten this and added signals waiting. I wrote a sample script bomb with detonators to shed some lights on this.
Setting up bomb script :
Lua:
function bomb(cid, setupPos)
print("--Parameters--")
print(cid, unpack(setupPos))
print("----------------")
doCreatureSay(cid, "Bomb has been set up, lets w8 to detonate it", TALKTYPE_MONSTER)
--This will execute, only when we feed it with proper signal, detonator does that job:D
wait_signal("detonate")
doCreatureSay(cid, "Bombed", TALKTYPE_MONSTER)
doSendMagicEffect(setupPos,CONST_ME_FIREATTACK)
end
function onUse(cid, item, fromPosition, itemEx, toPosition)
local bomb_thread = g_threads:create(bomb, cid, fromPosition)
if g_threads:addPlayerThread(cid,"bomb",bomb_thread) then
return true
end
doPlayerSendCancel(cid, "Wait untill your bomb explode, before you start setting up another one")
end
Detonator script:
Lua:
function onUse(cid, item, fromPosition, itemEx, toPosition)
local bomb_thread = g_threads.getThread(cid,"bomb_thread")
if bomb_thread == nil then
doPlayerSendCance(cid, "Setup your bomb first.")
return true
end
bomb_thread:sendSignal("detonate")
end
Producent - Consumer problem.
I recalled this problem from my class, and I was wondering how my alpha version of this lib would handle it. It was unable to handle it all back then and that inspired me to create an update.
We have 2 threads : one producing values and second one doing something with those values that have been produced. We dont want to stop any of it but only switch between their execution. That is what code below provides.
Lua:
break_value = 10
function consumer()
while true do
--Thread awaits a signal with value passed to it as a parameter
local value = wait_signal("product_receive")
print("Received value, ".. tostring(value))
--Send a back signal to producer thread, so it can recover production
producer_thread:sendSignal("consumed", value)
end
end
function producer()
local i = 0
while true do
i = i + 1
if i > break_value then
break
end
print("Value has been produced."..i)
--Send information about value has been produced, so consumer can 'consume' value :P
consumer_thread:sendSignal("product_receive", i)
--Now lets wait for a signal back
wait_signal("consumed")
end
end
producer_thread = g_threads:create(producer)
producer_thread:setName("producer")
consumer_thread = g_threads:create(consumer)
consumer_thread:setName("consmer")
--Send signal to scheduler that new threads were created.
g_threads.signalhandler:sendSignal("signal_awake")
Change Log:
*added possibility to send signals between threads,
* better handling of threads in general.
* added singleton class to handle all running threads.
* Producer-consumer problem implemented as an example
[/code]
Attachments
-
Stock-Images-Separator-GraphicsFairy31.jpg32.8 KB · Views: 6 · VirusTotal
Last edited: