import json from datetime import datetime, timedelta import logging import argparse import requests import msal from config import MsalConfig try: from zoneinfo import ZoneInfo except ImportError: from backports.zoneinfo import ZoneInfo WEEKDAYS= {0:"Mo", 1:"Di", 2:"Mi", 3:"Do", 4: "Fr", 5:"Sa", 6: "So"} # Optional logging # logging.getLogger("msal").setLevel(logging.INFO) # Optionally disable MSAL DEBUG logs # Create a preferably long-lived app instance which maintains a token cache. app = msal.ConfidentialClientApplication( MsalConfig.CLIENT_ID, authority=MsalConfig.AUTHORITY, client_credential=MsalConfig.SECRET, # token_cache=... # Default cache is in memory only. # You can learn how to use SerializableTokenCache from # https:#msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache ) def get_access_token(): global app # The pattern to acquire a token looks like this. result = None # Firstly, looks up a token from cache # Since we are looking for token for the current app, NOT for an end user, # notice we give account parameter as None. result = app.acquire_token_silent(MsalConfig.SCOPE, account=None) if result is None: logging.info("No suitable token exists in cache. Let's get a new one from AAD.") result= app.acquire_token_for_client(scopes=MsalConfig.SCOPE) else: logging.info("Token was found in cache.") if not "access_token" in result: # a final check logging.error(result.get("error")) logging.error(result.get("error_description")) logging.error(result.get("correlation_id")) # You may need this when reporting a bug raise AssertionError("Was not able to get an access token. Check msal auth.") return result def execute_get_request(token: dict, endpoint:str): return requests.get( # Use token to call downstream service endpoint, headers={'Authorization': 'Bearer ' + token['access_token']},).json() def execute_post_request(token, endpoint, data): return requests.post( endpoint, json=data, #data=data, headers={'Authorization': 'Bearer ' + token['access_token']}) def execute_patch_request(token, endpoint, data): return requests.patch( endpoint, json=data, #data=data, headers={'Authorization': 'Bearer ' + token['access_token']},).json() def send_mail(to, subject, content, user_id=MsalConfig.USER_ID): token=get_access_token() mail = { "message": { "subject": subject, "body": { "contentType": "html", "content": content }, "toRecipients": [ { "emailAddress": { "address": to } } ], }, "saveToSentItems": "false" } return execute_post_request(token, "https://graph.microsoft.com/v1.0/users/" + user_id + "/sendMail", mail) def update_calendar_event(event_id, data, user_id=MsalConfig.USER_ID): token = get_access_token() endpoint= "https://graph.microsoft.com/v1.0/users/" + user_id + f"/calendar/events/{event_id}" return execute_patch_request(token, endpoint, data) def execute_user_request(token, endpoint, user_id=MsalConfig.USER_ID): return execute_get_request(token, "https://graph.microsoft.com/v1.0/users/" + user_id + f"/{endpoint}") def get_all_calendar_events(): token = get_access_token() return execute_user_request(token, f"calendars/{MsalConfig.CALENDAR_ID}/events").get("value") def get_calendar_event(id, filter: str=""): token = get_access_token() return execute_user_request(token, f"calendars/{MsalConfig.CALENDAR_ID}/events/{id}{filter}") def get_future_calendar_events(): t1=datetime.now() t2=t1 + timedelta(days=365) t1 = t1.strftime("%Y-%m-%d") t2 = t2.strftime("%Y-%m-%d") filter = f"?StartDateTime={t1}&EndDateTime={t2}" token = get_access_token() return execute_user_request(token, f"calendars/{MsalConfig.CALENDAR_ID}/calendarview{filter}") def convert_datetimes(events): for event in events: start_date_time = datetime.fromisoformat(event["start"]["dateTime"][:-1]) \ .replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('localtime')) stop_date_time = datetime.fromisoformat(event["end"]["dateTime"][:-1]) \ .replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('localtime')) event["start"]["date"] = start_date_time.strftime("%d.%m.%Y") event["start"]["time"] = start_date_time.strftime("%H:%M:%S") event["end"]["date"] = stop_date_time.strftime("%d.%m.%Y") event["end"]["time"] = stop_date_time.strftime("%H:%M:%S") event["duration"] = str(stop_date_time - start_date_time)[:-3] event["weekday"] = WEEKDAYS[start_date_time.weekday()] return events def add_attendee(data, name, email): d = { "type": "required", "status": { "response": "none", "time": "0001-01-01T00:00:00Z" }, "emailAddress": { "name": name, "address": email } } data["attendees"].append(d) def delte_attendee(data, email): for i, d in enumerate(data["attendees"]): if d["emailAddress"]["address"] == email: del data["attendees"][i] break if __name__ == "__main__": logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-4s [%(filename)s:%(lineno)d] %(message)s', datefmt='%Y-%m-%d:%H:%M:%S', level=logging.INFO) parser = argparse.ArgumentParser(description='Process some integers.') parser.add_argument('--show_calendars', help='Show available calendars', action='store_true') parser.add_argument('--show_event_entries', help='Show available event fields', action='store_true') parser.add_argument('--show_events', help='Show available events for selected calendar', action='store_true') parser.add_argument('--update_test_event', help='Execute a patch request for testing', action='store_true') parser.add_argument('--send_test_email', help='send a test email', action='store_true') args = parser.parse_args() # Calling graph using the access token token = get_access_token() if args.show_calendars: calendars = execute_user_request(token, "calendars") cal_name_id = [(c["name"], c["id"]) for c in calendars["value"]] print("Available calendars are:") print(json.dumps(cal_name_id, indent=2)) if args.show_event_entries or args.show_events: events= get_future_calendar_events().get("value") if args.show_event_entries: print(json.dumps(list(events)[0].keys(), indent=2)) if args.show_events: #print(json.dumps(events, indent=2)) convert_datetimes(events) print(json.dumps([(e["subject"], e["start"], e["duration"], e["attendees"], e["id"]) for e in events], indent=2)) if args.update_test_event: data = { "attendees" : [ { "type": "required", "status": { "response": "none", "time": "0001-01-01T00:00:00Z" }, "emailAddress": { "name": "Simone Profus", "address": "simone.profus@propedal.at" } } ], } data["attendees"][0]["emailAddress"]["name"] = "Patrick S." data["attendees"][0]["emailAddress"]["address"] = "struebin.patrick@gmail.com" event_id = "AAMkADY0MDg1MTVjLTg5ZjItNGQxYS04MGQ3LWY2NjJmYjM0YmZhOQBGAAAAAADXD7SdVoWYQI4RYXbBumMEBwAf_ngZxs71RonY3GuLL8TVAADHFw_OAAAf_ngZxs71RonY3GuLL8TVAADQqYjcAAA%3D" ret = get_calendar_event(event_id) ret = update_calendar_event(token, event_id, data) print(json.dumps(ret, indent=2)) if args.send_test_email: with open("templates/auth/activate.html") as f: content=f.read() print(send_mail("struebin.patrick@gmail.com", "test test", content)) print("Done.")