2025-09-23 17:14:11 +03:00
|
|
|
from io import DEFAULT_BUFFER_SIZE
|
2025-09-23 15:53:51 +03:00
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import json
|
2025-09-23 17:14:11 +03:00
|
|
|
from psycopg2.sql import NULL
|
2025-09-23 15:53:51 +03:00
|
|
|
import requests
|
|
|
|
import psycopg2 as ps
|
|
|
|
|
|
|
|
def read_file(path):
|
|
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
|
|
return f.read()
|
|
|
|
|
|
|
|
def getenv(name):
|
|
|
|
try:
|
|
|
|
return os.environ[name]
|
|
|
|
except KeyError:
|
2025-09-23 17:14:11 +03:00
|
|
|
print(f"[ERROR] Missing environment variable {name}. You should NOT run this script by hand, please use systemd mastodon-kanidm-sync.service.")
|
2025-09-23 15:53:51 +03:00
|
|
|
exit(1)
|
|
|
|
|
|
|
|
|
2025-09-23 17:14:11 +03:00
|
|
|
# Import configuration
|
2025-09-23 15:53:51 +03:00
|
|
|
KANIDM_URL = getenv("KANIDM_URL")
|
|
|
|
KANIDM_TOKEN = read_file(getenv("KANIDM_TOKEN_PATH")).strip()
|
2025-09-23 23:11:25 +03:00
|
|
|
OWNER_USERNAME = getenv("OWNER_USERNAME")
|
2025-09-23 17:14:11 +03:00
|
|
|
|
2025-09-23 22:49:51 +03:00
|
|
|
# Fetch kanidm users list from userdata file
|
2025-09-23 17:14:11 +03:00
|
|
|
# Userdata file is json list with information about what users are configured by kanidm
|
2025-09-23 22:49:51 +03:00
|
|
|
try:
|
|
|
|
USERDATA = read_file(getenv("USERDATA_FILE_PATH")).strip()
|
|
|
|
userdata = json.loads(USERDATA)
|
|
|
|
except FileNotFoundError:
|
|
|
|
userdata = []
|
2025-09-23 15:53:51 +03:00
|
|
|
|
2025-09-23 17:14:11 +03:00
|
|
|
# Load database
|
2025-09-23 15:53:51 +03:00
|
|
|
conn = ps.connect(
|
|
|
|
dbname=getenv("POSTGRES_DBNAME"),
|
|
|
|
user=getenv("POSTGRES_USER"),
|
|
|
|
host=getenv("POSTGRES_HOST")
|
|
|
|
)
|
|
|
|
|
2025-09-23 17:14:11 +03:00
|
|
|
# Fetch current userdata from database
|
2025-09-23 15:53:51 +03:00
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute('''
|
|
|
|
SELECT identities.uid, users.id, user_roles.name
|
|
|
|
FROM users
|
|
|
|
JOIN identities
|
|
|
|
ON users.id = identities.id
|
|
|
|
LEFT JOIN user_roles
|
|
|
|
ON users.role_id = user_roles.id;
|
|
|
|
'''
|
|
|
|
)
|
|
|
|
|
|
|
|
state = cur.fetchall()
|
|
|
|
|
2025-09-23 17:14:11 +03:00
|
|
|
users = {}
|
|
|
|
for i in state:
|
|
|
|
users[i[0]] = {
|
|
|
|
"id": i[1],
|
|
|
|
"role": i[2],
|
|
|
|
"isKanidmUser": False
|
|
|
|
}
|
|
|
|
|
|
|
|
# Fetch Kanidm userdata
|
2025-09-23 15:53:51 +03:00
|
|
|
kanidm_users_raw = requests.get(
|
|
|
|
f"{KANIDM_URL}/v1/person",
|
|
|
|
headers={
|
|
|
|
"Authorization": f"Bearer {KANIDM_TOKEN}",
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
},
|
|
|
|
timeout=5,
|
|
|
|
).json()
|
|
|
|
|
|
|
|
for i in kanidm_users_raw:
|
|
|
|
i = i["attrs"]
|
2025-09-23 17:14:11 +03:00
|
|
|
for uid in i["name"]: # [user].attrs.name is a list
|
2025-09-23 23:07:05 +03:00
|
|
|
if uid in users: # Don't apply anything for users who have no mastodon access (sp.mastodon.users) or didn't register
|
2025-09-23 23:11:25 +03:00
|
|
|
if uid == OWNER_USERNAME:
|
|
|
|
users[uid]["isKanidmUser"] = True
|
|
|
|
users[uid]["role"] = "Owner"
|
|
|
|
continue
|
|
|
|
|
2025-09-23 22:54:10 +03:00
|
|
|
for group in i["memberof"]:
|
2025-09-23 23:07:05 +03:00
|
|
|
if group.startswith("sp.mastodon.admins@") or group.startswith("sp.admins@"):
|
2025-09-23 17:14:11 +03:00
|
|
|
if uid not in userdata:
|
|
|
|
userdata.append(uid)
|
2025-09-23 23:40:12 +03:00
|
|
|
print("a")
|
2025-09-23 17:14:11 +03:00
|
|
|
users[uid]["isKanidmUser"] = True
|
|
|
|
users[uid]["role"] = "Admin"
|
|
|
|
print(f"[INFO] {uid} got role Admin")
|
|
|
|
break
|
2025-09-23 23:07:05 +03:00
|
|
|
elif group.startswith("sp.mastodon.moderators@"):
|
2025-09-23 17:14:11 +03:00
|
|
|
if uid not in userdata:
|
|
|
|
userdata.append(uid)
|
|
|
|
users[uid]["isKanidmUser"] = True
|
|
|
|
users[uid]["role"] = "Moderator"
|
|
|
|
print(f"[INFO] {uid} got role Moderator")
|
|
|
|
break
|
|
|
|
elif uid in userdata:
|
|
|
|
# If user, who previously had a role, has no roles set by Kanidm, delete them from userdata list so allow setting roles directly by mastodon
|
|
|
|
users[uid]["isKanidmUser"] = True
|
|
|
|
users[uid]["role"] = None
|
|
|
|
userdata.remove(uid)
|
|
|
|
print(f"[INFO] {uid} has no roles")
|
|
|
|
|
|
|
|
print("[DEBUG] ", users) # DEBUG
|
2025-09-23 15:53:51 +03:00
|
|
|
|
|
|
|
cur.close()
|
|
|
|
conn.close()
|
2025-09-23 17:14:11 +03:00
|
|
|
|
|
|
|
print("[INFO] Final userdata file: ", userdata)
|
2025-09-23 22:46:49 +03:00
|
|
|
|
|
|
|
def write_userdata(mode):
|
|
|
|
with open(getenv("USERDATA_FILE_PATH"), mode) as f:
|
|
|
|
f.write(json.dumps(userdata))
|
|
|
|
f.close()
|
|
|
|
|
|
|
|
try:
|
|
|
|
write_userdata("w")
|
2025-09-23 23:34:17 +03:00
|
|
|
print("aw")
|
2025-09-23 22:46:49 +03:00
|
|
|
except FileNotFoundError:
|
|
|
|
write_userdata("x")
|
2025-09-23 23:34:17 +03:00
|
|
|
print("ax")
|