This commit is contained in:
2025-09-24 17:05:07 +03:00
parent 4008a92a49
commit 98832beb27

View File

@@ -1,8 +1,6 @@
from io import DEFAULT_BUFFER_SIZE from io import DEFAULT_BUFFER_SIZE
import os import os
import time
import json import json
from psycopg2.sql import NULL
import requests import requests
import psycopg2 as ps import psycopg2 as ps
@@ -26,9 +24,9 @@ OWNER_USERNAME = getenv("OWNER_USERNAME")
# Fetch kanidm users list from userdata file # Fetch kanidm users list from userdata file
# Userdata file is json list with information about what users are configured by kanidm # Userdata file is json list with information about what users are configured by kanidm
try: try:
USERDATA = read_file(getenv("USERDATA_FILE_PATH")).strip() USERDATA = read_file(getenv("USERDATA_FILE_PATH")).strip()
userdata = json.loads(USERDATA) userdata = json.loads(USERDATA)
print("userdata from file loaded") print("[INFO] ")
except FileNotFoundError: except FileNotFoundError:
userdata = [] userdata = []
@@ -44,9 +42,9 @@ cur = conn.cursor()
cur.execute(''' cur.execute('''
SELECT identities.uid, users.id, user_roles.name SELECT identities.uid, users.id, user_roles.name
FROM users FROM users
JOIN identities JOIN identities
ON users.id = identities.id ON users.id = identities.id
LEFT JOIN user_roles LEFT JOIN user_roles
ON users.role_id = user_roles.id; ON users.role_id = user_roles.id;
''' '''
) )
@@ -71,45 +69,53 @@ kanidm_users_raw = requests.get(
timeout=5, timeout=5,
).json() ).json()
def give_role(uid, role, putUserdata = True):
if (uid not in userdata) and (putUserdata):
userdata.append(uid)
users[uid]["isKanidmUser"] = True
users[uid]["role"] = role
print(f"[INFO] {uid} is marked as {role}")
for i in kanidm_users_raw: for i in kanidm_users_raw:
i = i["attrs"] i = i["attrs"]
for uid in i["name"]: # [user].attrs.name is a list for uid in i["name"]: # [user].attrs.name is a list
if uid in users: # Don't apply anything for users who have no mastodon access (sp.mastodon.users) or didn't register if uid in users: # Don't apply anything for users who have no mastodon access (sp.mastodon.users) or didn't register
if uid == OWNER_USERNAME: if uid == OWNER_USERNAME:
users[uid]["isKanidmUser"] = True give_role(uid, "Owner", False)
users[uid]["role"] = "Owner"
continue continue
for group in i["memberof"]: for group in i["memberof"]:
if group.startswith("sp.mastodon.admins@") or group.startswith("sp.admins@"): if group.startswith("sp.mastodon.admins@") or group.startswith("sp.admins@"):
print(uid not in userdata) give_role(uid, "Admin")
if uid not in userdata:
userdata.append(uid)
print("a")
users[uid]["isKanidmUser"] = True
users[uid]["role"] = "Admin"
print(f"[INFO] {uid} got role Admin")
break break
elif group.startswith("sp.mastodon.moderators@"): elif group.startswith("sp.mastodon.moderators@"):
if uid not in userdata: give_role(uid, "Moderator")
userdata.append(uid)
users[uid]["isKanidmUser"] = True
users[uid]["role"] = "Moderator"
print(f"[INFO] {uid} got role Moderator")
break break
elif uid in userdata: elif uid in userdata:
# If user, who previously had a role, has no roles set by Kanidm, delete them from userdata list so allow setting roles directly by mastodon # If user, who previously had a role, has no roles set by Kanidm, delete them from userdata list so allow setting roles directly by mastodon
users[uid]["isKanidmUser"] = True give_role(uid, None, False)
users[uid]["role"] = None
userdata.remove(uid) userdata.remove(uid)
print(f"[INFO] {uid} has no roles")
print("[DEBUG] ", users) # DEBUG print("[DEBUG] ", users) # DEBUG
for uid in users:
if not users[uid]["isKanidmUser"]:
continue
if users[uid]["role"]:
roleid = users[uid]["role"]
else:
roleid = "NULL"
cur.execute("UPDATE users SET role_id = {roleid} WHERE id = {users[uid]};")
cur.close() cur.close()
conn.close() conn.close()
print("[INFO] Final userdata file: ", userdata) print("[INFO] Final userdata.json file content: ", userdata)
def write_userdata(mode): def write_userdata(mode):
with open(getenv("USERDATA_FILE_PATH"), mode) as f: with open(getenv("USERDATA_FILE_PATH"), mode) as f:
@@ -118,7 +124,6 @@ def write_userdata(mode):
try: try:
write_userdata("w") write_userdata("w")
print("aw")
except FileNotFoundError: except FileNotFoundError:
print("[INFO] userdata.json file doesn't exist. Creating it")
write_userdata("x") write_userdata("x")
print("ax")