Square POS
import requests
from datetime import datetime, timedelta
import argparse
####### CONSTANTS (need to be filled out before using) #######
SQUARE_API_URL = "https://connect.squareup.com"
SQUARE_ACCESS_TOKEN = ""
LOCATION_IDS = []
LIVEREACH_API_URL = "https://prism.livereach.ai"
LIVEREACH_API_KEY = ""
CAMERA_ID = ""
CUSTOM_EVENT_UID = ""
####### TO CUSTOMIZE (change as you see fit) #######
# To see possible fields: https://developer.squareup.com/reference/square/objects/Order
def parse_order(order: dict) -> dict:
return {
"item": order["line_items"][0]["name"],
"order_number": order["id"][:5],
"price": float(order["total_money"]["amount"]) / 100,
"size": order["line_items"][0]["variation_name"],
}
####### MAIN #######
def main(exe_date, last_x_minutes: int):
# Get orders from Square API
response = requests.post(
f"{SQUARE_API_URL}/v2/orders/search",
json={
"location_ids": LOCATION_IDS,
"query": {
"filter": {
"date_time_filter": {
"closed_at": {
"start_at": iso8601_x_minutes_ago(exe_date, last_x_minutes),
"end_at": exe_date
}
},
"state_filter": {"states": ["COMPLETED"]}
},
"sort": {"sort_field": "CLOSED_AT"}
}
},
headers={"Authorization": f"Bearer {SQUARE_ACCESS_TOKEN}"}
)
if response.status_code != 200:
print(
"Request to {} returned a {}: {}".format(SQUARE_API_URL, response.status_code, response.text)
)
return
orders = response.json().get("orders", [])
# Parse orders and send them to Verkada
for order in orders:
event = parse_order(order)
create_event_response = requests.post(
f"{LIVEREACH_API_URL}/v1/schema/square-pos/events",
json={
"camera_id": CAMERA_ID,
"timestamp": order['closed_at'],
**event
},
headers={"Authorization": f"Api-Key {LIVEREACH_API_KEY}"}
)
assert create_event_response.status_code == 200, create_event_response.text
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Fetch data from point-of-sale services and save as custom events"
)
parser.add_argument(
'-x', '--execution_date', type=str, required=False, default=datetime.now().isoformat(), help="date of execution"
)
parser.add_argument(
'-l', '--last_x_minutes', type=int, required=False, default=15, help="Retrieve sales from the last X minutes"
)
cmdline = parser.parse_args()
exe_date = cmdline.execution_date
last_x_minutes = cmdline.last_x_minutes
main(exe_date, last_x_minutes)Last updated