import requests import json import time import requests.packages.urllib3 as urllib3 urllib3.disable_warnings() # Need to install requests package for python # pip install requests user = "admin" pwd = "admin" host_url = "http://10.10.0.29" #NetBrain system address headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} ''' Get token for netbrain ''' TENANT = 'Topo' #tenant name DOMAIN = 'Flow' #domain name
def getTokens(): login_api_url = r"/ServicesAPI/API/V1/Session" Login_url = host_url + login_api_url data = { "username": user, "password": pwd } token = requests.post(Login_url, data=json.dumps( data), headers=headers, verify=False) if token.status_code == 200: print(token.json()) return token.json()["token"] else: return "error" # get token token = getTokens() headers["Token"] = token def get_tenant_domain_id(): tenant_id_url = '/ServicesAPI/API/V1/CMDB/Tenants' full_url = host_url + tenant_id_url data = requests.get(full_url, headers=headers, verify=False) tenant_id = "" domain_id = "" print(data.json()) if data.status_code == 200: for tenant in data.json()['tenants']: if TENANT == tenant['tenantName']: tenant_id = tenant['tenantId'] if tenant_id: domain_id_url = '/ServicesAPI/API/V1/CMDB/Domains' full_domain_url = host_url + domain_id_url domain_data = requests.get(full_domain_url, params={'tenantId': tenant_id}, headers=headers, verify=False) print(domain_data.json()) if domain_data.status_code == 200: for domain in domain_data.json()['domains']: if DOMAIN == domain['domainName']: domain_id = domain['domainId'] return tenant_id, domain_id else: return tenant_id, domain_id def Logout(): logout_url = "/ServicesAPI/API/V1/Session" time.sleep(2) full_url = host_url + logout_url body = { "token": token } result = requests.delete(full_url, data=json.dumps( body), headers=headers, verify=False) print(result.json()) if result.status_code == 200: print("LogOut success...") else: data = "errorCode" + "LogOut API test failed... " return data # Trigger API function def TriggerTask(API_Body): # Trigger API url API_URL = r"/ServicesAPI/API/V1/Triggers/Run" # Trigger API payload api_full_url = host_url + API_URL api_result = requests.post(api_full_url, data=json.dumps( API_Body), headers=headers, verify=False) if api_result.status_code == 200: return api_result.json() else: return api_result.json() def stopTask(taskid): stop_url = '/ServicesAPI/API/V1/Triggers/Stop/' full_url = host_url + stop_url + taskid data = { } api_result = requests.post(full_url, data=json.dumps( data), headers=headers, verify=False) return api_result if __name__ == "__main__": tenant_id, domain_id = get_tenant_domain_id() print(tenant_id, domain_id) # tenant_id = 'd5436528-2b8a-e908-4c39-b07e7aaa11a9' # domain_id = '22d163f6-e2b1-48ca-99e8-88ab47809db4' API_BODY = { 'domain_setting': { 'tenant_id': tenant_id, 'domain_id': domain_id }, 'basic_setting': { 'user': 'admin', 'device': 'BJ*POP', #any device in NetBrain system 'stub_name': 'stub2', #API task to trigger 'triggered_by':"Netbrain" }, 'map_setting':{ 'map_create_mode':3, # 3 means creating a path map 'map_path_para':{ 'source': '172.24.31.195', 'destination': '10.10.7.235', 'direction': 1, # 1 means creating one-way path 'isLiveUseBaseLineConfig': True, 'advancedOption': { 'debugMode': False, 'calcL3ActivePath': False, 'useCommandsWithArguments': False, 'calcWhenDeniedByACL': False, 'calcWhenDeniedByPolicy': False, 'pathFixUp': False }, } }, } print(TriggerTask(API_BODY)) |