Calling Tasks in Third-Party Systems

If the third-party API server supports python script inputs, you can launch the pre-defined API-triggered diagnosis tasks through the following sample scripts (replace sample parameters with real instances):

import requests
import json
import time
import requests.packages.urllib3 as urllib3
 
urllib3.disable_warnings()
# Need to install requests package for python
# pip install requests
 
user = "NetBrain"
pwd = "NetBrain"
host_url = "http(s)://<IP address of NetBrain Web Server>/"
headers = {'Content-Type''application/json''Accept''application/json'}
'''
Get token for netbrain 
'''
TENANT = 'BVT_DB2TEN_83880'
DOMAIN = 'zjtest'
 
 
def getTokens():
    login_api_url = r"/ServicesAPI/API/V1/Session"
    Login_url = host_url + login_api_url
    data = {
        "username": user,
        "password": pwd
    }
    token = requests.post(Login_url, data=json.dumps(
        data), headers=headers, verify=False)
    if token.status_code == 200:
        print(token.json())
        return token.json()["token"]
    else:
        return "error"
# get token
token = getTokens()
headers["Token"] = token
 
 
 
def get_tenant_domain_id():
    tenant_id_url = '/ServicesAPI/API/V1/CMDB/Tenants' 
    full_url = host_url + tenant_id_url
    data = requests.get(full_url,headers=headers,verify=False)
    tenant_id = ""
    domain_id = ""
    if data.status_code == 200:
        for tenant in data.json()['tenants']:
            if TENANT == tenant['tenantName']:
                tenant_id = tenant['tenantId']
        if tenant_id:
            domain_id_url = '/ServicesAPI/API/V1/CMDB/Domains'
            full_domain_url = host_url +domain_id_url
            domain_data = requests.get(full_domain_url,params={'tenantId':tenant_id},headers=headers,verify=False)
            print(domain_data.json())
            if domain_data.status_code == 200:
                for domain in domain_data.json()['domains']:
                    if DOMAIN == domain['domainName']:
                        domain_id = domain['domainId']
        return tenant_id,domain_id
    else:
        return tenant_id,domain_id
def Logout():
 
    logout_url = "/ServicesAPI/API/logout"
    time.sleep(2)
    full_url = host_url + logout_url
    body = {
        "token": token
    }
    result = requests.post(full_url, data=json.dumps(
        body), headers=headers, verify=False)
    print(result.json())
    if result.json()["statusCode"] == 0:
        print("LogOut success...")
    else:
        data = "errorCode" + "LogOut API test failed... "
        return data
# Trigger API function
 
 
def TriggerTask(API_Body):
 
    # Trigger  API url
    API_URL = r"/ServicesAPI/API/V1/Triggers/Run"
    # Trigger API payload
 
    api_full_url = host_url + API_URL
    api_result = requests.post(api_full_url, data=json.dumps(
        API_Body), headers=headers, verify=False)
    if api_result.status_code == 200:
        return api_result.json()
    else:
        return api_result.json()
 
 
def stopTask(taskid):
    stop_url = '/ServicesAPI/API/StopTask'
    full_url = host_url + stop_url
    data = {
        'task_id': taskid
    }
    api_result = requests.post(full_url, data=json.dumps(
        data), headers=headers, verify=False)
    return api_result
 
if __name__ =="__main__":
    # tenant_id,domain_id = get_tenant_domain_id()
    # print(tenant_id,domain_id)
    tenant_id = 'e8bd5e21-1f76-2e30-086d-5bfa186c144d'
    domain_id = '92b19d13-8859-4836-a4fb-ce85f5edf657'
    
    API_BODY = {
            'domain_setting':
            {
                'tenant_id': tenant_id,
                'domain_id': domain_id
            },
            'basic_setting':
            {
                'user''admin',
                'device'"BJ*POP",
                'stub_name''stub4',
                # 'stub_name': 'stub1',
                'triggered_by':"Netbrain",
                # 'intf_name': 'FastEthernet0/0' 
           }
           
        }
    
    # print(get_tenant_domain_id())
 
    #         "map_setting":
    #     {
    #         "map_create_mode":3,
    #         "map_path_para":
    #         {
    #             "is_live":True,
    #             "source":"GW2Lab",
    #             "destination":"172.24.101.22",
    #             # "source_port":1025,
    #             # "destination_port":80
    #         }
    #     }
    # }
    # API_BODY = {
    #         'domain_setting':
    #         {
    #             'tenant_name': TENANT,
    #             'domain_name': DOMAIN
    #         },
    #         'basic_setting':
    #         {
    #             'user': 'admin',
    #             'device': "BJ*POP",
    #             'stub_name': 'runbook',
    #             # 'stub_name': 'stub1',
    #             'triggered_by':"Netbrain",
    #             # 'intf_name': 'FastEthernet0/0' 
    #        }
    # }
 
    print(TriggerTask(API_BODY))
    # body2={
    #     'domain_setting':
    #     {
    #         'tenant_name': TENANT,
    #         'domain_name': DOMAIN
    #     },
    #     'basic_setting':
    #     {
    #         'user': 'wangzheng',
    #         'device': 'MPLS_CE2',
    #         'stub_name': r'Ping and TraceRoute'
    #     },
    #     'ping_setting':{
    #         'source':"MPLS_PE2",
    #         "source_interface":"Ethernet0/0",
    #         "destination":"MPLS_CE1",
    #         "destination_interface":"FastEthernet0/0",
    #         'timeout_seconds':3,
    #         'packet_bytes':651,
    #         'packet_count':1111
 
    #     },
    #     'tracert_setting':{
    #         'source':'MPLS_CE1',
    #         'source_interface':'Ethernet1/0',
    #         'destination':"MPLS_CE2",
    #         'destination_interface':'Ethernet2/0',
    #         'max_hops':30
    #     }
    # }
    # print(TriggerTask(body2))