Sample python code using REST API
import json
import datetime
import time
import requests
PULSE_IT_HOST_ADDR = "http://172.16.29.130"
PULSE_IT_USER = "admin"
PULSE_IT_PASSWORD = "admin"
def get_sequence_templates():
try:
r = requests.get(
url="{pulse_it_host_addr}/sequence_template/rest/list".format(pulse_it_host_addr=PULSE_IT_HOST_ADDR),
auth=(PULSE_IT_USER, PULSE_IT_PASSWORD),
params=None,
)
if r.status_code == requests.codes.ok:
try:
result = r.json()
return result["response"]
except Exception as e:
print("Unable to call api endpoint: {}".format(r.text))
return {}
else:
print("An error occurred. API returned {} code".format(r.status_code))
except Exception as e:
print("An error occurred: {}".format(e))
def create_sequence(sequence_template):
a_color = "#EB5937"
a_user_message = "A user message"
a_description = "Lorem ipsum dolor sit amet, consectetur adipiscing elit."
try:
r = requests.post(
url="{pulse_it_host_addr}/sequence/rest/create_and_start".format(pulse_it_host_addr=PULSE_IT_HOST_ADDR),
verify=False,
auth=(PULSE_IT_USER, PULSE_IT_PASSWORD),
params=None,
data={
"template_id": sequence_template["id"],
"title": sequence_template["name"] + datetime.datetime.now().strftime("%Y%m%d%H%M"),
"deadline": datetime.date.today() + datetime.timedelta(days=1),
"color": a_color,
"user_message": a_user_message,
"thumbnail": None,
"allocation_date": None,
"allocation_user": None,
"priority": 3,
"input_context": json.dumps({
"description": a_description,
})
},
)
if r.status_code == requests.codes.ok:
try:
res = r.json()
if res["result"]:
print("Sequence {} started ".format(res["response"]["sequence_id"]))
return res["response"]
else:
print("Unable to start sequence: {}".format(res["response"]))
except Exception as e:
print("Unable to call api endpoint: {}".format(r.text))
return {}
else:
print("An error occurred. API returned {} code".format(r.status_code))
except Exception as e:
print("An error occurred: {}".format(e))
def get_sequence_data(sequence_id):
try:
r = requests.get(
url="{pulse_it_host_addr}/sequence/rest/data".format(pulse_it_host_addr=PULSE_IT_HOST_ADDR),
auth=(PULSE_IT_USER, PULSE_IT_PASSWORD),
params={
"sequence_id": sequence_id
},
)
if r.status_code == requests.codes.ok:
try:
result = r.json()
return result["response"]
except Exception as e:
print("Unable to call api endpoint: {}".format(r.text))
return {}
else:
print("An error occurred. API returned {} code".format(r.status_code))
except Exception as e:
print("An error occurred: {}".format(e))
def main():
sequence_templates = get_sequence_templates()
started_sequence = create_sequence(sequence_templates[0])
sequence_data = get_sequence_data(started_sequence["sequence_id"])
time.sleep(5)
print(sequence_data["seq_state"])
print("Progress: {}%".format(sequence_data["progress"]["current"]))
if __name__ == "__main__":
main()