L3-Bataille-Navale/source/utils/thread.py

34 lines
1 KiB
Python

from queue import Queue
from threading import Thread
from typing import Any, Callable
import pyglet
class StoppableThread(Thread):
"""
A thread that can be stopped.
The run method need to check for the "self._stop" variable and return manually if it is true.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stopped = False
def stop(self) -> None:
self.stopped = True
def in_pyglet_context(func: Callable, *args, **kwargs) -> Any:
"""
This function can be call in a thread. It will call the "func" in the pyglet event loop, avoiding
some operation that are not allowed outside of the pyglet context, and return its result
:param func: the function to call in the pyglet context
:param args: the args of the function
:param kwargs: the kwargs of the function
:return: the result of the function
"""
queue = Queue()
pyglet.clock.schedule_once(lambda dt: queue.put(func(*args, **kwargs)), 0)
return queue.get()