106 lines
2.5 KiB
Python
106 lines
2.5 KiB
Python
import subprocess
|
|
|
|
|
|
class BatmanController:
|
|
# TODO: need to be able to get ping in the network if possible, otherwise no other uses
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
|
|
class ChronyController:
|
|
"""
|
|
Wrapper to control chrony to manipulate time synchronisation
|
|
NOTE: the ideal method would be to use `chronyd.sock` to communicate, but it seems to only work with raw binary data
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.clear_sources()
|
|
self.deny_client("all")
|
|
|
|
@staticmethod
|
|
def run_command(*args: str) -> str:
|
|
# run the command through the chrony command line tool
|
|
process = subprocess.run(
|
|
["chronyc", *args],
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
# raise an error if the command did not ended up successfully
|
|
process.check_returncode()
|
|
|
|
# return the stdout of the command
|
|
return process.stdout.decode()
|
|
|
|
def get_sources(self) -> list[...]:
|
|
"""
|
|
Get the list of NTP sources
|
|
:return: the list of NTP sources
|
|
"""
|
|
|
|
...
|
|
|
|
def add_source(self, kind: str, source: ...) -> None:
|
|
"""
|
|
Add a source to chrony
|
|
"""
|
|
|
|
self.run_command("add", "server", host, "iburst")
|
|
|
|
def remove_source(self, source: ...) -> None:
|
|
"""
|
|
Remove a source from chrony
|
|
"""
|
|
|
|
...
|
|
|
|
def clear_sources(self) -> None:
|
|
"""
|
|
Clear the list of sources used by chrony
|
|
"""
|
|
|
|
for source in self.get_sources():
|
|
self.remove_source(source)
|
|
|
|
def get_clients(self) -> list[...]:
|
|
"""
|
|
Get the list of clients using us as a server
|
|
:return: the list of clients
|
|
"""
|
|
|
|
...
|
|
|
|
def allow_client(self, client: ...) -> None:
|
|
"""
|
|
Allow a client to use us as a server
|
|
:param client: the client information
|
|
"""
|
|
|
|
...
|
|
|
|
def allow_all_clients(self) -> None:
|
|
"""
|
|
Allow all clients to use us as a time-server
|
|
"""
|
|
|
|
def deny_client(self, client: ...) -> None:
|
|
"""
|
|
Deny a client to use us as a server
|
|
:param client: the client information
|
|
"""
|
|
|
|
...
|
|
|
|
def deny_all_clients(self) -> None:
|
|
"""
|
|
Deny all clients from using us as a time-server
|
|
"""
|
|
|
|
self.deny_client("all")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
chrony = ChronyController()
|
|
|
|
while True:
|
|
command = input("chronyc >>> ")
|
|
print(chrony.run_command(command))
|