imfreedom/ansible

4eaa5a34b086
Update the BSD agents to have all of our current dependencies
#!/usr/bin/env python3
# This script implements the dovecot checkpassword interface and ties it into
# jetbrains hub.
#
# See https://wiki2.dovecot.org/AuthDatabase/CheckPassword for more infromation
# on the interface.
import base64
import json
import os
import os.path
import sys
import urllib.request
import urllib.parse
hub_url = "{{ mail_hub_url }}"
hub_scopes = "{{ mail_hub_scopes }}"
hub_client_id = "{{ mail_hub_client_id }}"
hub_client_secret = "{{ mail_hub_client_secret }}"
hub_permanent_token = "{{ mail_hub_permanent_token }}"
hub_domain_groups = {
"pidgin.im": ["Pidgin Developer", "Pidgin Contributor", "Pidgin Email"],
"imfreedom.org": ["IMF Board"],
}
vmail_uid = "{{ mail_user }}"
vmail_gid = "{{ mail_group }}"
http_timeout = 30
def request_group_query(domain):
groups = hub_domain_groups[domain]
# use json.dumps to force quote the string
parts = [f"in:{json.dumps(x)}" for x in groups]
return " or ".join(parts)
def check_group(user, domain):
headers = {
"Authorization": f"Bearer {hub_permanent_token}",
}
try:
query = request_group_query(domain)
except KeyError:
print(f"unknown domain {domain}")
return False
params = {
"$top": "1",
"query": f"login:{user} and ({query})",
"fields": "login",
}
url = f"{hub_url}/api/rest/users?"
url += urllib.parse.urlencode(params)
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=http_timeout) as resp:
code = resp.getcode()
if code < 200 or code > 299:
print(f"invalid response {code}")
return False
data = json.loads(resp.read())
if "users" in data and len(data["users"]) == 1:
if "login" in data["users"][0] and data["users"][0]["login"] == user:
return True
return False
def validate_credentials(user, domain, password):
token = base64.b64encode(f"{hub_client_id}:{hub_client_secret}".encode("utf-8"))
strToken = token.decode("utf-8")
headers = {
"Authorization": f"Basic {strToken}"
}
body = {
"grant_type": "password",
"username": user,
"password": password,
"scope": hub_scopes,
}
data = urllib.parse.urlencode(body).encode("ascii")
url = f"{hub_url}/api/rest/oauth2/token"
req = urllib.request.Request(url, data=data, headers=headers)
with urllib.request.urlopen(req, timeout=http_timeout) as resp:
status = resp.getcode()
if status < 200 or status > 299:
return False
# if we got 2xx response, check if the user is in a group that can
# access email.
return check_group(user, domain)
return False
def main():
data = os.read(3, 512)
try:
username, password, _ = data.split(b"\x00", 2)
except ValueError:
print("invalid input")
sys.exit(1)
username = username.decode("utf-8").lower()
password = password.decode("utf-8")
try:
user, domain = username.split("@", 1)
except ValueError:
print("no domain specified")
sys.exit(1)
try:
authorized = os.environ.get("AUTHORIZED", "0")
if authorized == "1":
# userdb lookups
if not check_group(user, domain):
sys.exit(3)
os.environ["AUTHORIZED"] = "2"
else:
# passdb lookups
if not validate_credentials(user, domain, password):
print("invalid credentials")
sys.exit(1)
if len(sys.argv) > 1:
home = os.path.join("{{ mail_home }}", domain, user)
os.environ.update({
"userdb_uid": vmail_uid,
"userdb_gid": vmail_gid,
"INSECURE_SETUID": "1",
"HOME": home,
"USER": f"{user}@{domain}",
"EXTRA": "userdb_uid userdb_gid userdb_mail",
})
os.execvp(sys.argv[1], sys.argv[1:])
except Exception as e: # assume any exceptions are temporary failures
print(f"Caught exception {e}")
raise e
# if we hit an http error, return 111 to note a temporary failure.
sys.exit(111)
if __name__ == "__main__":
main()