A-A+
SOAP python demo 两例
SOAP 协议使用的Python,其中有一个库是zeep,这个库一直有人维护,而且是最新的。
第一个demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | # -*-coding:utf-8-*- from requests.auth import HTTPBasicAuth # or HTTPDigestAuth, or OAuth1, etc. from zeep.client import Client from requests import Session from zeep.transports import Transport from zeep import Client as ZeepClient # from zeep.cache import SqliteCache from zeep.wsse.username import UsernameToken user = 'WathiqSTATSPP' password = 'bFGT776&UYTRE123' def make_efile_client(): if DEBUG: session = Session() session.verify = False client = ZeepClient("http://www.test.com/", transport=Transport(session=session)) else: # TODO: CHECK WHEN ON PRODUCTION IF AUTH HEADER IS NEEDED AND IF SESSION.VERIFY SHOULD BE FALSE TOO client = ZeepClient(EFILE_WEBSERVICE_URL) return client def make_wathiq_client(): if DEBUG: session = Session() session.verify = False token = UsernameToken(user, password) try: client = ZeepClient("http://www.test.com/", transport=Transport(session=session), wsse=token) except Exception as e: print('EXCEPTION:make_wathiq_client -> ', e) client = None else: # TODO: CHECK ON PRODUCTION IF SESSION.VERIFY SHOULD BE FALSE TOO client = ZeepClient("http://www.test.com/", wsse=UsernameToken(user, password)) return client def test(): user = 'WathiqSTATSPP' password = 'bFGT776&UYTRE123' session = Session() session.auth = HTTPBasicAuth(user, password) client = Client('http://my-endpoint.com/production.svc?wsdl', transport=Transport(session=session)) print(client.service.bookYear('test 2')) |
案例二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | import zeep import logging from requests import Session from requests.auth import HTTPBasicAuth from zeep.transports import Transport from zeep.wsse.username import UsernameToken import pandas as pd import pdb USERNAME = 'jack.dwyer@blastam.com' PASSWORD = 'Blast6020' authentication_dict = {"Username":USERNAME, "Password":PASSWORD} client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/contactmanagement.asmx?WSDL') xml = "<contactssearchcriteria><version major=\"2\" minor=\"0\" build=\"0\" revision=\"0\"/>" +\ "<set>Partial</set><evaluatedefault>True</evaluatedefault>" +\ "<group><filter><filtertype>EmailAction</filtertype>" +\ "<campaign><anycampaign/></campaign>" +\ "<action><status>Do</status><operator>Open</operator></action>" +\ "<operator>Anytime</operator></filter>" +\ "</group></contactssearchcriteria>" runreport = client.service.RunReport(authentication = authentication_dict, xmlContactQuery = xml) print runreport ticket = runreport['body']['RunReportResult']['ReportTicketId'] response = client.service.DownloadReport(authentication = authentication_dict, reportTicketId=ticket, fromRow=2, toRow=5, reportType='rpt_Summary_Campaign_Results')['body']['DownloadReportResult']['ReportResult'] dict_list = [] for i in response: dict_list.append(i['_attr_1']) my_dict = zeep.helpers.serialize_object(dict_list) campaigns_df = pd.DataFrame(my_dict) print campaigns_df # # runids = campaigns_df.CampaignRunId.unique() # client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/campaignmanagement.asmx?WSDL') # for id in runids: # id = str(id) # temp_zeep = client.service.GetTrackedLinkSummaryReport(authentication = authentication_dict, campaignRunId = id)['body']['GetTrackedLinkSummaryReportResult']['TrackedLinkSummaryData'] # temp_dict = zeep.helpers.serialize_object(temp_zeep) # temp_df = pd.DataFrame(temp_dict) # summary_df1 = pd.DataFrame() # if summary_df1.empty: # summary_df1 = temp_df.copy() # break # else: # summary_df1.append(temp_df) # break # print summary_df1 # # client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/campaignmanagement.asmx?WSDL') # zeep_dict = client.service.ListCampaigns(authentication = authentication_dict)['body']['ListCampaignsResult']['CampaignDescription'] # my_dict = zeep.helpers.serialize_object(zeep_dict) # campaigns_df = pd.DataFrame(my_dict) # # # campaigns = campaigns_df.Id.unique() # # for id in campaigns: # id = str(id) # temp_zeep = client.service.ListTrackedLinksByCampaign(authentication = authentication_dict, campaignIds = id)['body']['ListTrackedLinksByCampaignResult']['TrackedLinkDescription'] # #print temp_zeep # temp_dict = zeep.helpers.serialize_object(temp_zeep) # temp_df = pd.DataFrame(temp_dict) # summary_df2 = pd.DataFrame() # if summary_df2.empty: # summary_df2 = temp_df.copy() # break # else: # summary_df2.append(temp_df) # break # print summary_df2 # # # # # # client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/contactmanagement.asmx?WSDL') # # # xml = "<contactssearchcriteria><version major=\"2\" minor=\"0\" build=\"0\" revision=\"0\"/>" +\ # "<set>Partial</set><evaluatedefault>True</evaluatedefault>" +\ # "<group><filter><filtertype>EmailAction</filtertype>" +\ # "<campaign><anycampaign/></campaign>"+\ # "<action><status>Do</status><operator>ClickAnyLink</operator></action>" +\ # "<operator>PastNDay</operator><value>1</value></filter>" +\ # "</group></contactssearchcriteria>" # # # # # runreport = client.service.RunReport(authentication = authentication_dict, xmlContactQuery = xml) # print runreport # ticket = runreport['body']['RunReportResult']['ReportTicketId'] # # response = client.service.DownloadReport(authentication = authentication_dict, # reportTicketId=ticket, fromRow=2, toRow=5, reportType='rpt_Summary_Campaign_Results')#['body']['DownloadReportResult']['ReportResult'] # print response # dict_list = [] # for i in response: # dict_list.append(i['_attr_1']) # my_dict = zeep.helpers.serialize_object(dict_list) # campaigns_df = pd.DataFrame(my_dict) # print campaigns_df |
数据来源:https://github.com/JackNDwyer/Things_I_have_done/blob/63ca534ad70da780070d43a333dcc0f463215ae1/soap.py
https://github.com/sysalong/license-portal/blob/master/main/services.py
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏