• There is NO official Otland's Discord server and NO official Otland's server list. The Otland's Staff does not manage any Discord server or server list. Moderators or administrator of any Discord server or server lists have NO connection to the Otland's Staff. Do not get scammed!
  • New resources must be posted under Resources tab. A discussion thread will be created automatically, you can't open threads manually anymore.

[lib] Semi-thread lib with signal handling and pausing. [+ examples]!!

tarjei

Necronian Engineer
Joined
May 25, 2008
Messages
505
Reaction score
126
Location
Poland
Hello, I came across Snejk's thread that considered threading in lua. Today I wanted to share some other approach to this topic.

Lua:
TaskHandler = {}
function TaskHandler:new(callback, ...)
	local obj = {}
	self.__index = self
	setmetatable(obj, self)
	
	
	obj.idle_loops = 0
	
	obj.waiting_signals = {}
	obj.pending_signals = {}
	
	obj.parameters = {}
	obj.__debug = nil
	obj.setEnvironment(obj, callback)
	obj.coroutine = coroutine.create(callback, arg ~= nil and unpack(arg))	
	return obj

end
function TaskHandler:debug(...)
	if self.__debug ~= nil then
		return print("[Thread : "..tostring(self.name), unpack(arg))
	end	
end

function TaskHandler:addWaitSignal(name)
		self.waiting_signals[name] = true

end
function TaskHandler:releaseSignal(name)
		self.waiting_signals[name] = nil

end
function TaskHandler:setName(name)
	self.name = name
end
function TaskHandler:recovery(preventsignal)
	g_threads:setId(self)
	g_threads.threads[self:getId()] = self 
	if preventsignal == nil then
	    g_threads.signalhandler:sendSignal("signal_awake")
	end
end
function TaskHandler:suspend()
	table.insert(g_threads.releasedIds, self:getId())
	g_threads.threads[self:getId()] = nil
end
function TaskHandler:wait(delay)

	 self:debug("---WAITING : "..tostring(delay))
	 
	 --temporairly remove from main coroutine execution loop
	 self:suspend()
	 coroutine.yield("wait", delay)
end

function TaskHandler:wait_signal(...)
	 --Counts all waitin threads, to freeze signal thread if all threads are waiting

	 self:debug("---SIGNAL AWAITING HERE --- ")
	 for k, v in pairs(arg) do

		if type(v) == "string" and tonumber(v) == nil then
			self.waiting_signals[v] = v
		end
	 end
	 --Maybe we received a signal before we demanded it?
	 local lastArg = self:handlePendingSignal()
	 

	  self:debug("sig wait: "..table.maxv(self.waiting_signals))
	  self:debug("sig pending: "..table.maxv(self.pending_signals))
	 if table.maxv(self.waiting_signals) > 0  then
	  self:debug("---SIGNAL AWAITING OUT ---yielded ")
		return coroutine.yield("wait_signal", arg)
	 else
		return lastArg and unpack(lastArg) or nil
	 end
	 
end
function TaskHandler:sendSignal(signal,...)
	self:debug("-----Sending signal--------"..tostring(signal))
	self:debug("--- status: "..tostring(self.waiting_signals[signal]))
	--self:debug("---amount of waiting signls: "..table.maxv(self.waiting_signals))
	self:debug("-------------------------")
	if type(signal) == "string"  then
		self:debug("Awaiting for signals :"..tostring(table.maxv(self.waiting_signals) - 1))

		if coroutine.running() ~= nil then
			self:addPendingSignal(signal, arg)
		else
			self:releaseSignal(signal)
			self:continue(unpack(arg))
		end
		
	
	end
	
end
function TaskHandler:addPendingSignal(signal, arg)
	self:debug("-----Arguments of signal", unpack(arg))
	table.insert(self.pending_signals , {sig = signal, arg = arg})

end
function TaskHandler:handlePendingSignal()
	--Signal is not waiting for any signal to beeing proceeded, ignore it
	if table.maxv(self.waiting_signals) == 0 then
		return nil
	end
	local size = table.maxv(self.pending_signals)
	--self:debug(tostring(self.name).."//Handling pending signals ")
	self:debug("Handling pending signals "..size)
	--self:debug("Waiting signals "..table.maxv(self.waiting_signals))

			local signal_p = self.pending_signals[1] 
			if signal_p == nil then
			  return nil
			end
		 self:releaseSignal(signal_p.sig)
		 self:debug("sig pending: "..table.maxv(self.pending_signals))
		table.remove(self.pending_signals,1)
		self:debug("sig pending after removal: "..table.maxv(self.pending_signals))
		return signal_p.arg
end
function TaskHandler:continue(...)

	if coroutine.status(self.coroutine) ~= 'dead' and table.maxv(self.waiting_signals) == 0 then
		local succes, sig_type, signals = coroutine.resume(self.coroutine, unpack(arg))
		if not succes then
			if not succes then 
			  print("COROUTINE ERROR")
			  print(debug.traceback(self.coroutine, sig_type)) 
			end		
		end
		
		if sig_type == "wait_signal" and signals ~= nil then
			for k, v in pairs(signals) do
				self:debug("[:continue] parameters of yield .. "..k..":"..tostring(signal))
			end
			return
		end
		if sig_type == "wait" and tonumber(signals) ~= nil then
			--Lets schedule restoring this thread to threads main coroutine
			addEvent(function() 
				self:recovery()
			end, tonumber(signals))
			return
		end
		if sig_type == "suspend" then
			--self:continue() 
			return
		end
		if sig_type == "switch_thread" then
			signals.thread:continue(unpack(signals.arg))
			return
		end			
		if sig_type == "escape" then
			self:debug("Escapinf from thread.. executing callback()")
			local callback = signals.callback
			self:continue(callback(unpack(signals.arg)))
		end
		self.idle_loops  = self.idle_loops + 1
	end	 
	if coroutine.status(self.coroutine) == 'dead' then
		self:kill()

	end	
	return self.idle_loops 
end
function TaskHandler:kill()
	if self.onEnd ~= nil and type(self.onEnd) == "function" then
		self.onEnd()
	end
	if self.playerExecuter ~= nil then
		g_threads.threads[self.playerExecuter][self.name] = nil
	end	
	if self.id then
	g_threads.threads[self.id] = nil
	end
end
function TaskHandler:setEnvironment(func)
		
		local sandbox = {}
		sandbox.print = print
		sandbox.suspend = function()
			self:suspend()
		end
		
		setmetatable(sandbox, {__index = _G})
		
		sandbox.wait_signal = function(...)
			self:debug("Awaiting thread signal, name:"..tostring(table.maxv(arg)))
			return self:wait_signal(unpack(arg))
		end
		sandbox.wait = function(delay)
			self:wait(delay)
		end	
		sandbox.escape = function(callback, ...)
			coroutine.yield("escape",  {callback = callback, arg = arg})

			
		end
		
		setfenv(func, sandbox)
end
function TaskHandler:setId(id)
	 self:debug("[TaskHandler:setId]")
	self.id = id
end
function TaskHandler:getId()
	return self.id
end
------------------------
------------------------
------------------------
ThreadsHandler = {}
function ThreadsHandler:new()
		local obj = {}
		obj.threads = {}
		obj.waiting_threads = 0
		obj.releasedIds = {}
		self.__index = self
		setmetatable(obj,self)
		return obj
end
function ThreadsHandler:run()
	self:signalHandler()
end
function ThreadsHandler:setId(thread)
		local id = nil
		if table.maxv(self.releasedIds) > 0 then
			 id = self.releasedIds[1]
			 table.remove(self.releasedIds,1)
			 thread:setId(id)
			 return true
		end
		thread:setId(table.maxv(self.threads) + 1)	
end
function ThreadsHandler:getThread(cid, name)
	if self.threads[getPlayerName(cid)] == nil then
	  return nil
	end
	
	return self.threads[getPlayerName(cid)][name]
end
function ThreadsHandler:create(callback, ...)
		local thread = TaskHandler:new(callback, unpack(arg))
		--We put parameters of first signal to queue
		table.insert(thread.parameters, arg)
		thread.__debug = nil

		thread:recovery(true)
		
		return thread
end	
function ThreadsHandler:countWaitingThreads()
	for k, v in pairs(self.threads) do
		local wait_sigs = v.waiting_signals_count
		
	end
end
function ThreadsHandler:signalHandler()
	--Prevent infinite execution
	local suspentionLimit =  10
	local func = function()
		 print("SIGNAL HANDLER")
		local iterator = 0
		local pending = 0
		while true do
			iterator = iterator + 1
			local size = table.maxv(self.threads)
			--print("Status -- waiting threads : ".. g_threads:getWaitingThreads()  .. " All threads: "..size)
			if g_threads:getWaitingThreads() == size then
				-- this condition provides one extra loop over threads, to make sure all pending signals were handler
				-- Warning! if you will send signals that are unhandler, it might couse infinite loops.
				if pending == 0 then
					wait_signal("signal_awake")
				else
					pending = 0 
				end
			end
			--update size variable
			size = table.maxv(self.threads)
			
				for k, thread in ipairs(self.threads) do
					
					--Thread might have receive signals before it was even started, so lets check a pending box
					escape(function() 
						--Threads should be handled "outside" of main coroutine, thats why I used escape.
						if table.maxv(thread.pending_signals) > 0 then
								pending = pending + 1
						end
						thread:debug("Thread execute begin")
						thread:debug("Suspention: 1)"..tostring(thread.idle_loops).."2)"..tostring(suspentionLimit))
						local lastArg = thread:handlePendingSignal()
						table.remove(thread.parameters,1)
						local suspend = thread:continue(lastArg ~= nil and unpack(lastArg))
						thread:debug("Thread execute ended")
						if thread.idle_loops >= suspentionLimit then
							--Temporary remove from scheduler.
							thread:suspend()
							--reset idle loops counter 
							thread.idle_loops = 0
						end
						--TaskHandler:handlePendingSignals()
						
					end )

				end
			if g_threads:getWaitingThreads() < size  and size > 0 or pending > 0 then
				
			else
				wait_signal("signal_awake")
			end
			
		end
	end
	self.signalhandler = TaskHandler:new(func)
	self.signalhandler:continue()
	
end
function ThreadsHandler:addThread(name, thread)

end
function ThreadsHandler:addPlayerThread(cid, name, thread)
		if self:getThread(cid, name) == nil then
			if self.threads[getPlayerName(cid)] == nil then
				self.threads[getPlayerName(cid)] = {}
			end
			
			thread.name = name
			thread.playerExecuter = getPlayerName(cid)
			self.threads[getPlayerName(cid)][name] = thread
			thread:continue(unpack(thread.parameters))
			
			return true
		end
		return false
end
function ThreadsHandler:getWaitingThreads()
	local ret = 0
	for k, v in pairs(self.threads) do
		if table.maxv(v.waiting_signals) > 0 then
		 ret = ret + 1
		end
	end
	return ret
end
function ThreadsHandler:killThread(cid, name)
		self.threads[getPlayerName(cid)][name] = nil
		self:debug("Killed a thread")
		self.waiting_threads = self.waiting_threads - 1
end

g_threads = ThreadsHandler:new()
g_threads:run()
Stock-Images-Separator-GraphicsFairy31.jpg

It has got one simple advantage. Inside of this function you can make a pause of execution. It uses addEvent for that purpose, and its quite efficient.
Here is some reallysimple showoff.

Lua:
function testEntryPoint(cid, ...)
	doCreatureSay(cid,"Delaying message",1)
	wait(1000) --delays for a second
	doCreatureSay(cid,"Woot, it gots delayed :D",1)
	for k, v in pairs(arg) do
		doCreatureSay(cid, v ,1)  --speaks out every addional argument you pass to the function
		wait(3000) -- delays 3 seconds
	end
end
local core = TaskHandler:new(testEntryPoint, cid, "text1","text2")

If someone wants to checkout sn3ejk's approach : link

- - - Updated - - -

Sample Script :
Lua:
local roatations = {
	[0] = {[1] = {-1, 0},
		   [-1] = {1, 0}},
	 [1] = {[0] = {0, 1}},
	 [-1] = {[0] = {0,-1}}
}
function lights(cid)
	local size = 4
	local currentAngle = 0
		local x = 0
		local y = 0
		local dx = 1
		local dy = 0
		local efects = 0
		local startPos = getCreaturePosition(cid)
	while true do
		if efects > math.random(5,10)*size then break end
		if math.abs(x + dx) > size or math.abs(y + dy) > size then
			local rot = roatations[dx][dy]
			dx = rot[1]
			dy = rot[2]
			doCreatureSay(cid,"rotating",1)
		else
			x = x + dx
			y = y + dy	
			efects = efects + 1
			doSendMagicEffect({x=startPos.x + x, y = startPos.y + y, z = startPos.z}, 18)
			wait(100)
		end
	end
	doCreateTeleport(1387, getCreaturePosition(cid), {x=startPos.x + x, y = startPos.y + y, z = startPos.z})

end


function onUse(cid, var, item, fromPosition, toPosition, itemEx, combat)
     local core = TaskHandler:new(lights, cid)

	
end
Stock-Images-Separator-GraphicsFairy31.jpg

@Edit
I have rewriten this and added signals waiting. I wrote a sample script bomb with detonators to shed some lights on this.

Setting up bomb script :
Lua:
function  bomb(cid, setupPos)
		
		print("--Parameters--")
		print(cid, unpack(setupPos))
		print("----------------")
		doCreatureSay(cid, "Bomb has been set up, lets w8 to detonate it", TALKTYPE_MONSTER)
		--This will execute, only when we feed it with proper signal, detonator does that job:D
		wait_signal("detonate")
		doCreatureSay(cid, "Bombed", TALKTYPE_MONSTER)
		doSendMagicEffect(setupPos,CONST_ME_FIREATTACK)
		
end






function onUse(cid, item, fromPosition, itemEx, toPosition)
        local bomb_thread = g_threads:create(bomb, cid, fromPosition)
	if g_threads:addPlayerThread(cid,"bomb",bomb_thread)  then
	     return true
	end

	doPlayerSendCancel(cid, "Wait untill your bomb explode, before you start setting up another one")

end

Detonator script:
Lua:
function onUse(cid, item, fromPosition, itemEx, toPosition)
        local bomb_thread = g_threads.getThread(cid,"bomb_thread")
	if bomb_thread == nil then 
		doPlayerSendCance(cid, "Setup your bomb first.")
		return true
	end

	bomb_thread:sendSignal("detonate")
end

Stock-Images-Separator-GraphicsFairy31.jpg

Producent - Consumer problem.
I recalled this problem from my class, and I was wondering how my alpha version of this lib would handle it. It was unable to handle it all back then and that inspired me to create an update.
We have 2 threads : one producing values and second one doing something with those values that have been produced. We dont want to stop any of it but only switch between their execution. That is what code below provides.

Lua:
break_value = 10
function consumer()
	while true do
			
			--Thread awaits a signal with value passed to it as a parameter
			local value = wait_signal("product_receive")

				print("Received value, ".. tostring(value))
				--Send a back signal to producer thread, so it can recover production
			
				producer_thread:sendSignal("consumed", value)


	end
end

function producer()
	local i = 0
	while true do
		i = i + 1
		if i > break_value then
		 break
		end

		  print("Value has been produced."..i)
		  --Send information about value has been produced, so consumer can 'consume' value :P
		  consumer_thread:sendSignal("product_receive", i)
		  --Now lets wait for a signal back
		  wait_signal("consumed")
		
	end
end

 producer_thread = g_threads:create(producer)
 producer_thread:setName("producer")
   consumer_thread = g_threads:create(consumer)
   consumer_thread:setName("consmer")
   --Send signal to scheduler that new threads were created.
	g_threads.signalhandler:sendSignal("signal_awake")

Change Log:
*added possibility to send signals between threads,
* better handling of threads in general.
* added singleton class to handle all running threads.
* Producer-consumer problem implemented as an example

[/code]
 

Attachments

  • Stock-Images-Separator-GraphicsFairy31.jpg
    Stock-Images-Separator-GraphicsFairy31.jpg
    32.8 KB · Views: 6 · VirusTotal
Last edited:
Great to see very skilled libs like that, but the problem is that lua doesnt really support multi Threading so its no to useful as it looks like.
 
Yea indeed lua is not supporting multi threading at all I guess, but nice thing is to mess around with stuff like that. I guess I will use it for event library handling instead of using addEvents alone:p
 
Yet another update,
+added possibility to send signals from inside of threads
 
Back
Top