1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22
23 import heapq
24 import json
25 import logging
26 import re
27 import os
28 import time
29 import traceback
30 import uuid
31
32 from cproton import PN_PYREF, PN_ACCEPTED, PN_EVENT_NONE
33
34 from ._delivery import Delivery
35 from ._endpoints import Connection, Endpoint, Link, Session, Terminus
36 from ._exceptions import SSLUnavailable
37 from ._data import Described, symbol, ulong
38 from ._message import Message
39 from ._transport import Transport, SSL, SSLDomain
40 from ._url import Url
41 from ._common import isstring, unicode2utf8, utf82unicode
42 from ._events import Collector, EventType, EventBase, Handler, Event
43 from ._selectable import Selectable
44
45 from ._handlers import OutgoingMessageHandler, IOHandler
46
47 from ._io import IO, PN_INVALID_SOCKET
48
49 from . import _compat
50 from ._compat import queue
51
52
53 _logger = logging.getLogger("proton")
58
62
63
64 -class Task(object):
65
66 - def __init__(self, reactor, deadline, handler):
71
73 return self._deadline < rhs._deadline
74
76 self._cancelled = True
77
78 @property
81
82 @property
85
106
108
109 - def __init__(self, *handlers, **kwargs):
110 self._previous = PN_EVENT_NONE
111 self._timeout = 0
112 self.mark()
113 self._yield = False
114 self._stop = False
115 self._collector = Collector()
116 self._selectable = None
117 self._selectables = 0
118 self._global_handler = IOHandler()
119 self._handler = Handler()
120 self._timerheap = []
121 self._timers = 0
122 self.errors = []
123 for h in handlers:
124 self.handler.add(h, on_error=self.on_error)
125
127 self.errors.append(info)
128 self.yield_()
129
130
131
132
134 """
135 Return a proxy handler that dispatches to the provided handler.
136
137 If handler throws an exception then on_error is called with info
138 """
139 return handler
140
142 return self._global_handler
143
145 self._global_handler = self._make_handler(handler)
146
147 global_handler = property(_get_global, _set_global)
148
151
154
155 timeout = property(_get_timeout, _set_timeout)
156
159
161 """ This sets the reactor now instant to the current time """
162 self._now = _now()
163 return self._now
164
165 @property
168
171
173 self._handler = self._make_handler(handler)
174
175 handler = property(_get_handler, _set_handler)
176
178
179 self.timeout = 3.14159265359
180 self.start()
181 while self.process(): pass
182 self.stop()
183 self.process()
184
185 self._global_handler = None
186 self._handler = None
187
188
193
202
203 @property
211
213 """ This """
214 if self.errors:
215 for exc, value, tb in self.errors[:-1]:
216 traceback.print_exception(exc, value, tb)
217 exc, value, tb = self.errors[-1]
218 _compat.raise_(exc, value, tb)
219
259
261 self._stop = True
262 self._check_errors()
263
266
277
279 while self._timers > 0:
280 t = self._timerheap[0]
281 if t._cancelled:
282 heapq.heappop(self._timerheap)
283 self._timers -= 1
284 elif t._deadline > self._now:
285 return
286 else:
287 heapq.heappop(self._timerheap)
288 self._timers -= 1
289 self.push_event(t, Event.TIMER_TASK)
290
291 @property
293 while self._timers > 0:
294 t = self._timerheap[0]
295 if t._cancelled:
296 heapq.heappop(self._timerheap)
297 self._timers -= 1
298 else:
299 return t._deadline
300 return None
301
302 - def acceptor(self, host, port, handler=None):
309
311 """Deprecated: use connection_to_host() instead
312 """
313 impl = self._make_handler(handler)
314 result = Connection()
315 if impl:
316 result.handler = impl
317 result._reactor = self
318 result.collect(self._collector)
319 return result
320
322 """Create an outgoing Connection that will be managed by the reactor.
323 The reactor's pn_iohandler will create a socket connection to the host
324 once the connection is opened.
325 """
326 conn = self.connection(handler)
327 self.set_connection_host(conn, host, port)
328 return conn
329
331 """Change the address used by the connection. The address is
332 used by the reactor's iohandler to create an outgoing socket
333 connection. This must be set prior to opening the connection.
334 """
335 connection.set_address(host, port)
336
338 """This may be used to retrieve the remote peer address.
339 @return: string containing the address in URL format or None if no
340 address is available. Use the proton.Url class to create a Url object
341 from the returned value.
342 """
343 _url = connection.get_address()
344 return utf82unicode(_url)
345
346 - def selectable(self, handler=None, delegate=None):
354
355 - def update(self, selectable):
357
359 self._collector.put(obj, etype)
360
363 """
364 Can be added to a reactor to allow events to be triggered by an
365 external thread but handled on the event thread associated with
366 the reactor. An instance of this class can be passed to the
367 Reactor.selectable() method of the reactor in order to activate
368 it. The close() method should be called when it is no longer
369 needed, to allow the event loop to end if needed.
370 """
371
373 self.queue = queue.Queue()
374 self.pipe = os.pipe()
375 self._transport = None
376 self._closed = False
377
379 """
380 Request that the given event be dispatched on the event thread
381 of the reactor to which this EventInjector was added.
382 """
383 self.queue.put(event)
384 os.write(self.pipe[1], b"!")
385
387 """
388 Request that this EventInjector be closed. Existing events
389 will be dispatched on the reactors event dispatch thread,
390 then this will be removed from the set of interest.
391 """
392 self._closed = True
393 os.write(self.pipe[1], b"!")
394
397
399 sel = event.context
400
401 sel.reading = True
402 sel.update()
403
413
416 """
417 Application defined event, which can optionally be associated with
418 an engine object and or an arbitrary subject
419 """
420
421 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
435
436 @property
439
443
446 """
447 Class to track state of an AMQP 1.0 transaction.
448 """
449
450 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
451 self.txn_ctrl = txn_ctrl
452 self.handler = handler
453 self.id = None
454 self._declare = None
455 self._discharge = None
456 self.failed = False
457 self._pending = []
458 self.settle_before_discharge = settle_before_discharge
459 self.declare()
460
463
466
468 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
469
473
478
479 - def send(self, sender, msg, tag=None):
484
491
492 - def update(self, delivery, state=None):
496
502
505
528
531 """
532 Abstract interface for link configuration options
533 """
534
536 """
537 Subclasses will implement any configuration logic in this
538 method
539 """
540 pass
541
542 - def test(self, link):
543 """
544 Subclasses can override this to selectively apply an option
545 e.g. based on some link criteria
546 """
547 return True
548
553
559
562 - def apply(self, sender): pass
563
565
568 - def apply(self, receiver): pass
569
571
587
588
589 -class Filter(ReceiverOption):
591 self.filter_set = filter_set
592
593 - def apply(self, receiver):
595
598 """
599 Configures a link with a message selector filter
600 """
601
602 - def __init__(self, value, name='selector'):
604
607 - def apply(self, receiver):
610
611
612 -class Move(ReceiverOption):
613 - def apply(self, receiver):
615
616
617 -class Copy(ReceiverOption):
618 - def apply(self, receiver):
620
629
635
642
646 self._default_session = None
647
649 if not self._default_session:
650 self._default_session = _create_session(connection)
651 return self._default_session
652
655 """
656 Internal handler that triggers the necessary socket connect for an
657 opened connection.
658 """
659
662
664 if not self._override(event):
665 event.dispatch(self.base)
666
668 conn = event.connection
669 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
670
673
674 - def __init__(self, reactor, host, port, handler=None):
684
685 - def set_ssl_domain(self, ssl_domain):
686 self._ssl_domain = ssl_domain
687
693
714
716 """
717 Internal handler that triggers the necessary socket connect for an
718 opened connection.
719 """
720
735
768
771
776
779
798
801
804 """
805 A reconnect strategy involving an increasing delay between
806 retries, up to a maximum or 10 seconds.
807 """
808
811
814
822
823
824 -class Urls(object):
828
831
833 try:
834 return next(self.i)
835 except StopIteration:
836 self.i = iter(self.values)
837 return next(self.i)
838
852
854 confname = 'connect.json'
855 confpath = ['.', '~/.config/messaging','/etc/messaging']
856 for d in confpath:
857 f = os.path.join(d, confname)
858 if os.path.isfile(f):
859 return f
860 return None
861
863 conf = os.environ.get('MESSAGING_CONNECT_FILE') or _find_config_file()
864 if conf and os.path.isfile(conf):
865 with open(conf, 'r') as f:
866 json_text = f.read()
867 json_text = _strip_json_comments(json_text)
868 return json.loads(json_text)
869 else:
870 return {}
871
881 pattern = re.compile(r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"', re.DOTALL | re.MULTILINE)
882 return re.sub(pattern, replacer, json_text)
883
885 if scheme == 'amqps':
886 return 5671
887 else:
888 return 5672
889
891 """A representation of the AMQP concept of a 'container', which
892 loosely speaking is something that establishes links to or from
893 another container, over which messages are transfered. This is
894 an extension to the Reactor class that adds convenience methods
895 for creating connections and sender- or receiver- links.
896 """
897
898 - def __init__(self, *handlers, **kwargs):
913
914 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
915 **kwargs):
916 """
917 Initiates the establishment of an AMQP connection. Returns an
918 instance of proton.Connection.
919
920 @param url: URL string of process to connect to
921
922 @param urls: list of URL strings of process to try to connect to
923
924 Only one of url or urls should be specified.
925
926 @param reconnect: Reconnect is enabled by default. You can
927 pass in an instance of Backoff to control reconnect behavior.
928 A value of False will prevent the library from automatically
929 trying to reconnect if the underlying socket is disconnected
930 before the connection has been closed.
931
932 @param heartbeat: A value in milliseconds indicating the
933 desired frequency of heartbeats used to test the underlying
934 socket is alive.
935
936 @param ssl_domain: SSL configuration in the form of an
937 instance of proton.SSLDomain.
938
939 @param handler: a connection scoped handler that will be
940 called to process any events in the scope of this connection
941 or its child links
942
943 @param kwargs: 'sasl_enabled', which determines whether a sasl
944 layer is used for the connection. 'allowed_mechs', an optional
945 string specifying the SASL mechanisms allowed for this
946 connection; the value is a space-separated list of mechanism
947 names; the mechanisms allowed by default are determined by
948 your SASL library and system configuration, with two
949 exceptions: GSSAPI and GSS-SPNEGO are disabled by default; to
950 enable them, you must explicitly add them using this option;
951 clients must set the allowed mechanisms before the the
952 outgoing connection is attempted; servers must set them before
953 the listening connection is setup. 'allow_insecure_mechs', a
954 flag indicating whether insecure mechanisms, such as PLAIN
955 over a non-encrypted socket, are allowed. 'virtual_host', the
956 hostname to set in the Open performative used by peer to
957 determine the correct back-end service for the client; if
958 'virtual_host' is not supplied the host field from the URL is
959 used instead. 'user', the user to authenticate. 'password',
960 the authentication secret.
961
962 """
963 if not url and not urls and not address:
964 config = _get_default_config()
965 scheme = config.get('scheme', 'amqp')
966 _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', _get_default_port_for_scheme(scheme)))
967 _ssl_domain = None
968 _kwargs = kwargs
969 if config.get('user'):
970 _kwargs['user'] = config.get('user')
971 if config.get('password'):
972 _kwargs['password'] = config.get('password')
973 sasl_config = config.get('sasl', {})
974 _kwargs['sasl_enabled'] = sasl_config.get('enabled', True)
975 if sasl_config.get('mechanisms'):
976 _kwargs['allowed_mechs'] = sasl_config.get('mechanisms')
977 tls_config = config.get('tls', {})
978 if scheme == 'amqps':
979 _ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
980 ca = tls_config.get('ca')
981 cert = tls_config.get('cert')
982 key = tls_config.get('key')
983 if ca:
984 _ssl_domain.set_trusted_ca_db(str(ca))
985 if tls_config.get('verify', True):
986 _ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, str(ca))
987 if cert and key:
988 _ssl_domain.set_credentials(str(cert), str(key), None)
989
990 return self._connect(_url, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=_ssl_domain, **_kwargs)
991 else:
992 return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
993
994 - def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
995 conn = self.connection(handler)
996 conn.container = self.container_id or str(_generate_uuid())
997 conn.offered_capabilities = kwargs.get('offered_capabilities')
998 conn.desired_capabilities = kwargs.get('desired_capabilities')
999 conn.properties = kwargs.get('properties')
1000
1001 connector = Connector(conn)
1002 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
1003 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
1004 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
1005 connector.user = kwargs.get('user', self.user)
1006 connector.password = kwargs.get('password', self.password)
1007 connector.virtual_host = kwargs.get('virtual_host')
1008 if connector.virtual_host:
1009
1010 conn.hostname = connector.virtual_host
1011 connector.ssl_sni = kwargs.get('sni')
1012 connector.max_frame_size = kwargs.get('max_frame_size')
1013
1014 conn._overrides = connector
1015 if url:
1016 connector.address = Urls([url])
1017 elif urls:
1018 connector.address = Urls(urls)
1019 elif address:
1020 connector.address = address
1021 else:
1022 raise ValueError("One of url, urls or address required")
1023 if heartbeat:
1024 connector.heartbeat = heartbeat
1025 if reconnect:
1026 connector.reconnect = reconnect
1027 elif reconnect is None:
1028 connector.reconnect = Backoff()
1029
1030
1031 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
1032 conn._session_policy = SessionPerConnection()
1033 conn.open()
1034 return conn
1035
1036 - def _get_id(self, container, remote, local):
1037 if local and remote:
1038 "%s-%s-%s" % (container, remote, local)
1039 elif local:
1040 return "%s-%s" % (container, local)
1041 elif remote:
1042 return "%s-%s" % (container, remote)
1043 else:
1044 return "%s-%s" % (container, str(_generate_uuid()))
1045
1058
1059 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
1060 """
1061 Initiates the establishment of a link over which messages can
1062 be sent. Returns an instance of proton.Sender.
1063
1064 There are two patterns of use. (1) A connection can be passed
1065 as the first argument, in which case the link is established
1066 on that connection. In this case the target address can be
1067 specified as the second argument (or as a keyword
1068 argument). The source address can also be specified if
1069 desired. (2) Alternatively a URL can be passed as the first
1070 argument. In this case a new connection will be established on
1071 which the link will be attached. If a path is specified and
1072 the target is not, then the path of the URL is used as the
1073 target address.
1074
1075 The name of the link may be specified if desired, otherwise a
1076 unique name will be generated.
1077
1078 Various LinkOptions can be specified to further control the
1079 attachment.
1080 """
1081 if isstring(context):
1082 context = Url(context)
1083 if isinstance(context, Url) and not target:
1084 target = context.path
1085 session = self._get_session(context)
1086 snd = session.sender(name or self._get_id(session.connection.container, target, source))
1087 if source:
1088 snd.source.address = source
1089 if target:
1090 snd.target.address = target
1091 if handler is not None:
1092 snd.handler = handler
1093 if tags:
1094 snd.tag_generator = tags
1095 _apply_link_options(options, snd)
1096 snd.open()
1097 return snd
1098
1099 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
1100 """
1101 Initiates the establishment of a link over which messages can
1102 be received (aka a subscription). Returns an instance of
1103 proton.Receiver.
1104
1105 There are two patterns of use. (1) A connection can be passed
1106 as the first argument, in which case the link is established
1107 on that connection. In this case the source address can be
1108 specified as the second argument (or as a keyword
1109 argument). The target address can also be specified if
1110 desired. (2) Alternatively a URL can be passed as the first
1111 argument. In this case a new connection will be established on
1112 which the link will be attached. If a path is specified and
1113 the source is not, then the path of the URL is used as the
1114 target address.
1115
1116 The name of the link may be specified if desired, otherwise a
1117 unique name will be generated.
1118
1119 Various LinkOptions can be specified to further control the
1120 attachment.
1121 """
1122 if isstring(context):
1123 context = Url(context)
1124 if isinstance(context, Url) and not source:
1125 source = context.path
1126 session = self._get_session(context)
1127 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
1128 if source:
1129 rcv.source.address = source
1130 if dynamic:
1131 rcv.source.dynamic = True
1132 if target:
1133 rcv.target.address = target
1134 if handler is not None:
1135 rcv.handler = handler
1136 _apply_link_options(options, rcv)
1137 rcv.open()
1138 return rcv
1139
1141 if not _get_attr(context, '_txn_ctrl'):
1142 class InternalTransactionHandler(OutgoingMessageHandler):
1143 def __init__(self):
1144 super(InternalTransactionHandler, self).__init__(auto_settle=True)
1145
1146 def on_settled(self, event):
1147 if hasattr(event.delivery, "transaction"):
1148 event.transaction = event.delivery.transaction
1149 event.delivery.transaction.handle_outcome(event)
1150
1151 def on_unhandled(self, method, event):
1152 if handler:
1153 event.dispatch(handler)
1154
1155 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
1156 context._txn_ctrl.target.type = Terminus.COORDINATOR
1157 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
1158 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
1159
1160 - def listen(self, url, ssl_domain=None):
1161 """
1162 Initiates a server socket, accepting incoming AMQP connections
1163 on the interface and port specified.
1164 """
1165 url = Url(url)
1166 acceptor = self.acceptor(url.host, url.port)
1167 ssl_config = ssl_domain
1168 if not ssl_config and url.scheme == 'amqps':
1169
1170 if self.ssl:
1171 ssl_config = self.ssl.server
1172 else:
1173 raise SSLUnavailable("amqps: SSL libraries not found")
1174 if ssl_config:
1175 acceptor.set_ssl_domain(ssl_config)
1176 return acceptor
1177
1182