A-A+

SOAP python demo 两例

2019年01月28日 15:16 汪洋大海 暂无评论 阅读 267 views 次

SOAP 协议使用的Python,其中有一个库是zeep,这个库一直有人维护,而且是最新的。

第一个demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# -*-coding:utf-8-*-
from requests.auth import HTTPBasicAuth  # or HTTPDigestAuth, or OAuth1, etc.
from zeep.client import Client
from requests import Session
from zeep.transports import Transport
from zeep import Client as ZeepClient
# from zeep.cache import SqliteCache
from zeep.wsse.username import UsernameToken
 
 
user = 'WathiqSTATSPP'
password = 'bFGT776&UYTRE123'
 
def make_efile_client():
    if DEBUG:
        session = Session()
        session.verify = False
 
        client = ZeepClient("http://www.test.com/", transport=Transport(session=session))
    else:
        # TODO: CHECK WHEN ON PRODUCTION IF AUTH HEADER IS NEEDED AND IF SESSION.VERIFY SHOULD BE FALSE TOO
        client = ZeepClient(EFILE_WEBSERVICE_URL)
 
    return client
 
 
def make_wathiq_client():
    if DEBUG:
        session = Session()
        session.verify = False
 
        token = UsernameToken(user, password)
 
        try:
            client = ZeepClient("http://www.test.com/", transport=Transport(session=session), wsse=token)
        except Exception as e:
            print('EXCEPTION:make_wathiq_client -> ', e)
            client = None
 
    else:
        # TODO: CHECK ON PRODUCTION IF SESSION.VERIFY SHOULD BE FALSE TOO
        client = ZeepClient("http://www.test.com/", wsse=UsernameToken(user, password))
 
    return client
 
 
def test():
    user = 'WathiqSTATSPP'
    password = 'bFGT776&UYTRE123'
    session = Session()
    session.auth = HTTPBasicAuth(user, password)
    client = Client('http://my-endpoint.com/production.svc?wsdl', transport=Transport(session=session))
    print(client.service.bookYear('test 2'))

案例二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import zeep
import logging
from requests import Session
from requests.auth import HTTPBasicAuth
from zeep.transports import Transport
from zeep.wsse.username import UsernameToken
import pandas as pd
import pdb
USERNAME = 'jack.dwyer@blastam.com'
PASSWORD = 'Blast6020'
authentication_dict = {"Username":USERNAME,
   "Password":PASSWORD}
client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/contactmanagement.asmx?WSDL')
 
xml = "<contactssearchcriteria><version major=\"2\" minor=\"0\" build=\"0\" revision=\"0\"/>" +\
"<set>Partial</set><evaluatedefault>True</evaluatedefault>" +\
"<group><filter><filtertype>EmailAction</filtertype>" +\
"<campaign><anycampaign/></campaign>" +\
"<action><status>Do</status><operator>Open</operator></action>" +\
"<operator>Anytime</operator></filter>" +\
"</group></contactssearchcriteria>"
 
runreport = client.service.RunReport(authentication = authentication_dict, xmlContactQuery = xml)
print runreport
ticket = runreport['body']['RunReportResult']['ReportTicketId']
 
response = client.service.DownloadReport(authentication = authentication_dict,
reportTicketId=ticket, fromRow=2, toRow=5, reportType='rpt_Summary_Campaign_Results')['body']['DownloadReportResult']['ReportResult']
dict_list = []
for i in response:
   dict_list.append(i['_attr_1'])
my_dict = zeep.helpers.serialize_object(dict_list)
campaigns_df = pd.DataFrame(my_dict)
print campaigns_df
#
# runids = campaigns_df.CampaignRunId.unique()
# client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/campaignmanagement.asmx?WSDL')
# for id in runids:
#     id = str(id)
#     temp_zeep = client.service.GetTrackedLinkSummaryReport(authentication = authentication_dict, campaignRunId = id)['body']['GetTrackedLinkSummaryReportResult']['TrackedLinkSummaryData']
#     temp_dict = zeep.helpers.serialize_object(temp_zeep)
#     temp_df = pd.DataFrame(temp_dict)
#     summary_df1 = pd.DataFrame()
#     if summary_df1.empty:
#         summary_df1 = temp_df.copy()
#         break
#     else:
#         summary_df1.append(temp_df)
#         break
# print summary_df1
#
# client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/campaignmanagement.asmx?WSDL')
# zeep_dict = client.service.ListCampaigns(authentication = authentication_dict)['body']['ListCampaignsResult']['CampaignDescription']
# my_dict = zeep.helpers.serialize_object(zeep_dict)
# campaigns_df = pd.DataFrame(my_dict)
#
#
# campaigns = campaigns_df.Id.unique()
#
# for id in campaigns:
#     id = str(id)
#     temp_zeep = client.service.ListTrackedLinksByCampaign(authentication = authentication_dict, campaignIds = id)['body']['ListTrackedLinksByCampaignResult']['TrackedLinkDescription']
#     #print temp_zeep
#     temp_dict = zeep.helpers.serialize_object(temp_zeep)
#     temp_df = pd.DataFrame(temp_dict)
#     summary_df2 = pd.DataFrame()
#     if summary_df2.empty:
#         summary_df2 = temp_df.copy()
#         break
#     else:
#         summary_df2.append(temp_df)
#         break
# print summary_df2
#
# #
# #
# client = zeep.Client(wsdl = 'https://ws.campaigner.com/2013/01/contactmanagement.asmx?WSDL')
#
#
# xml = "<contactssearchcriteria><version major=\"2\" minor=\"0\" build=\"0\" revision=\"0\"/>" +\
#     "<set>Partial</set><evaluatedefault>True</evaluatedefault>" +\
#     "<group><filter><filtertype>EmailAction</filtertype>" +\
#     "<campaign><anycampaign/></campaign>"+\
#     "<action><status>Do</status><operator>ClickAnyLink</operator></action>" +\
#     "<operator>PastNDay</operator><value>1</value></filter>" +\
#     "</group></contactssearchcriteria>"
#
#
#
#
# runreport = client.service.RunReport(authentication = authentication_dict, xmlContactQuery = xml)
# print runreport
# ticket = runreport['body']['RunReportResult']['ReportTicketId']
#
# response = client.service.DownloadReport(authentication = authentication_dict,
# reportTicketId=ticket, fromRow=2, toRow=5, reportType='rpt_Summary_Campaign_Results')#['body']['DownloadReportResult']['ReportResult']
# print response
# dict_list = []
# for i in response:
#     dict_list.append(i['_attr_1'])
# my_dict = zeep.helpers.serialize_object(dict_list)
# campaigns_df = pd.DataFrame(my_dict)
# print campaigns_df

数据来源:https://github.com/JackNDwyer/Things_I_have_done/blob/63ca534ad70da780070d43a333dcc0f463215ae1/soap.py
https://github.com/sysalong/license-portal/blob/master/main/services.py

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×
标签:

给我留言