255 lines
8.5 KiB
Python
255 lines
8.5 KiB
Python
import json
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
import argparse
|
|
|
|
import requests
|
|
import msal
|
|
from config import MsalConfig
|
|
|
|
try:
|
|
from zoneinfo import ZoneInfo
|
|
except ImportError:
|
|
from backports.zoneinfo import ZoneInfo
|
|
|
|
WEEKDAYS= {0:"Mo", 1:"Di", 2:"Mi", 3:"Do", 4: "Fr", 5:"Sa", 6: "So"}
|
|
# Optional logging
|
|
|
|
# logging.getLogger("msal").setLevel(logging.INFO) # Optionally disable MSAL DEBUG logs
|
|
|
|
# Create a preferably long-lived app instance which maintains a token cache.
|
|
app = msal.ConfidentialClientApplication(
|
|
MsalConfig.CLIENT_ID, authority=MsalConfig.AUTHORITY,
|
|
client_credential=MsalConfig.SECRET,
|
|
# token_cache=... # Default cache is in memory only.
|
|
# You can learn how to use SerializableTokenCache from
|
|
# https:#msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
|
|
)
|
|
|
|
def get_access_token():
|
|
global app
|
|
|
|
# The pattern to acquire a token looks like this.
|
|
result = None
|
|
|
|
# Firstly, looks up a token from cache
|
|
# Since we are looking for token for the current app, NOT for an end user,
|
|
# notice we give account parameter as None.
|
|
result = app.acquire_token_silent(MsalConfig.SCOPE, account=None)
|
|
|
|
if result is None:
|
|
logging.info("No suitable token exists in cache. Let's get a new one from AAD.")
|
|
result= app.acquire_token_for_client(scopes=MsalConfig.SCOPE)
|
|
else:
|
|
logging.info("Token was found in cache.")
|
|
|
|
if not "access_token" in result: # a final check
|
|
logging.error(result.get("error"))
|
|
logging.error(result.get("error_description"))
|
|
logging.error(result.get("correlation_id")) # You may need this when reporting a bug
|
|
raise AssertionError("Was not able to get an access token. Check msal auth.")
|
|
|
|
return result
|
|
|
|
def execute_get_request(token: dict, endpoint:str):
|
|
return requests.get( # Use token to call downstream service
|
|
endpoint,
|
|
headers={'Authorization': 'Bearer ' + token['access_token']},).json()
|
|
|
|
def execute_post_request(token, endpoint, data):
|
|
return requests.post(
|
|
endpoint,
|
|
json=data,
|
|
#data=data,
|
|
headers={'Authorization': 'Bearer ' + token['access_token']})
|
|
|
|
def execute_patch_request(token, endpoint, data):
|
|
return requests.patch(
|
|
endpoint,
|
|
json=data,
|
|
#data=data,
|
|
headers={'Authorization': 'Bearer ' + token['access_token']},).json()
|
|
|
|
def send_mail(to, subject, content, user_id=MsalConfig.USER_ID):
|
|
|
|
token=get_access_token()
|
|
|
|
mail = {
|
|
"message": {
|
|
"subject": subject,
|
|
"body": {
|
|
"contentType": "html",
|
|
"content": content
|
|
},
|
|
"toRecipients": [
|
|
{
|
|
"emailAddress": {
|
|
"address": to
|
|
}
|
|
}
|
|
],
|
|
},
|
|
"saveToSentItems": "false"
|
|
}
|
|
|
|
return execute_post_request(token, "https://graph.microsoft.com/v1.0/users/" + user_id + "/sendMail", mail)
|
|
|
|
def update_calendar_event(event_id, data, user_id=MsalConfig.USER_ID):
|
|
token = get_access_token()
|
|
|
|
endpoint= "https://graph.microsoft.com/v1.0/users/" + user_id + f"/calendar/events/{event_id}"
|
|
return execute_patch_request(token, endpoint, data)
|
|
|
|
def execute_user_request(token, endpoint, user_id=MsalConfig.USER_ID):
|
|
return execute_get_request(token, "https://graph.microsoft.com/v1.0/users/" + user_id + f"/{endpoint}")
|
|
|
|
def get_all_calendar_events():
|
|
token = get_access_token()
|
|
|
|
return execute_user_request(token, f"calendars/{MsalConfig.CALENDAR_ID}/events").get("value")
|
|
|
|
def get_calendar_event(id, filter: str=""):
|
|
token = get_access_token()
|
|
|
|
return execute_user_request(token, f"calendars/{MsalConfig.CALENDAR_ID}/events/{id}{filter}")
|
|
|
|
def get_future_calendar_events():
|
|
|
|
t1=datetime.now()
|
|
t2=t1 + timedelta(days=365)
|
|
t1 = t1.strftime("%Y-%m-%d")
|
|
t2 = t2.strftime("%Y-%m-%d")
|
|
|
|
filter = f"?StartDateTime={t1}&EndDateTime={t2}" + "&$top=300"
|
|
|
|
token = get_access_token()
|
|
|
|
return execute_user_request(token, f"calendars/{MsalConfig.CALENDAR_ID}/calendarview{filter}")
|
|
|
|
|
|
def convert_datetime(event):
|
|
start_date_time = datetime.fromisoformat(event["start"]["dateTime"][:-1]) \
|
|
.replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo(MsalConfig.TIME_ZONE))
|
|
|
|
stop_date_time = datetime.fromisoformat(event["end"]["dateTime"][:-1]) \
|
|
.replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo(MsalConfig.TIME_ZONE))
|
|
|
|
event["start"]["date"] = start_date_time.strftime("%d.%m.%Y")
|
|
event["start"]["time"] = start_date_time.strftime("%H:%M:%S")
|
|
event["end"]["date"] = stop_date_time.strftime("%d.%m.%Y")
|
|
event["end"]["time"] = stop_date_time.strftime("%H:%M:%S")
|
|
event["duration"] = str(stop_date_time - start_date_time)[:-3]
|
|
event["weekday"] = WEEKDAYS[start_date_time.weekday()]
|
|
return event
|
|
|
|
def convert_datetimes(events):
|
|
for event in events:
|
|
convert_datetime(event)
|
|
|
|
return events
|
|
|
|
|
|
def add_attendee(data, name, email):
|
|
d = {
|
|
"type": "required",
|
|
"status": {
|
|
"response": "none",
|
|
"time": "0001-01-01T00:00:00Z"
|
|
},
|
|
"emailAddress": {
|
|
"name": name,
|
|
"address": email
|
|
}
|
|
}
|
|
|
|
data["attendees"].append(d)
|
|
|
|
def delte_attendee(data, email):
|
|
for i, d in enumerate(data["attendees"]):
|
|
if d["emailAddress"]["address"] == email:
|
|
del data["attendees"][i]
|
|
break
|
|
|
|
def check_attendee_presence(data, email):
|
|
for d in data["attendees"]:
|
|
if d["emailAddress"]["address"] == email:
|
|
return True
|
|
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
|
|
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-4s [%(filename)s:%(lineno)d] %(message)s',
|
|
datefmt='%Y-%m-%d:%H:%M:%S',
|
|
level=logging.INFO)
|
|
|
|
parser = argparse.ArgumentParser(description='Process some integers.')
|
|
parser.add_argument('--show_calendars', help='Show available calendars', action='store_true')
|
|
parser.add_argument('--show_event_entries', help='Show available event fields', action='store_true')
|
|
parser.add_argument('--show_events', help='Show available events for selected calendar', action='store_true')
|
|
parser.add_argument('--update_test_event', help='Execute a patch request for testing', action='store_true')
|
|
parser.add_argument('--send_test_email', help='send a test email', action='store_true')
|
|
parser.add_argument('--get_future_alendar_events', help='gets future calendar events', action='store_true')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Calling graph using the access token
|
|
token = get_access_token()
|
|
|
|
if args.show_calendars:
|
|
calendars = execute_user_request(token, "calendars")
|
|
cal_name_id = [(c["name"], c["id"]) for c in calendars["value"]]
|
|
|
|
print("Available calendars are:")
|
|
print(json.dumps(cal_name_id, indent=2))
|
|
|
|
if args.show_event_entries or args.show_events:
|
|
events= get_future_calendar_events().get("value")
|
|
|
|
if args.show_event_entries:
|
|
print(json.dumps(list(events)[0].keys(), indent=2))
|
|
|
|
if args.show_events:
|
|
#print(json.dumps(events, indent=2))
|
|
|
|
convert_datetimes(events)
|
|
convert_datetime(events[0])
|
|
|
|
print(json.dumps([(e["subject"], e["start"], e["duration"], e["attendees"], e["id"]) for e in events], indent=2))
|
|
|
|
if args.update_test_event:
|
|
|
|
data = {
|
|
"attendees" :
|
|
[
|
|
{
|
|
"type": "required",
|
|
"status": {
|
|
"response": "none",
|
|
"time": "0001-01-01T00:00:00Z"
|
|
},
|
|
"emailAddress": {
|
|
"name": "Simone Profus",
|
|
"address": "simone.profus@propedal.at"
|
|
}
|
|
}
|
|
],
|
|
}
|
|
|
|
data["attendees"][0]["emailAddress"]["name"] = "Patrick S."
|
|
data["attendees"][0]["emailAddress"]["address"] = "struebin.patrick@gmail.com"
|
|
|
|
event_id = "AAMkADY0MDg1MTVjLTg5ZjItNGQxYS04MGQ3LWY2NjJmYjM0YmZhOQBGAAAAAADXD7SdVoWYQI4RYXbBumMEBwAf_ngZxs71RonY3GuLL8TVAADHFw_OAAAf_ngZxs71RonY3GuLL8TVAADQqYjcAAA%3D"
|
|
ret = get_calendar_event(event_id)
|
|
ret = update_calendar_event(token, event_id, data)
|
|
|
|
print(json.dumps(ret, indent=2))
|
|
|
|
if args.send_test_email:
|
|
|
|
print(send_mail("struebin.patrick@gmail.com", "test test", "bla"))
|
|
|
|
if args.get_future_alendar_events:
|
|
events = get_future_calendar_events()
|
|
|
|
print("Done.") |