Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h)
88
89 - def _init(self):
90 self.errors = []
91
92 - def on_error(self, info):
93 self.errors.append(info) 94 self.yield_()
95
96 - def _get_global(self):
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
99 - def _set_global(self, handler):
100 impl = _chandler(handler, self.on_error) 101 pn_reactor_set_global_handler(self._impl, impl) 102 pn_decref(impl)
103 104 global_handler = property(_get_global, _set_global) 105
106 - def _get_timeout(self):
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
109 - def _set_timeout(self, secs):
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111 112 timeout = property(_get_timeout, _set_timeout) 113
114 - def yield_(self):
115 pn_reactor_yield(self._impl)
116
117 - def mark(self):
118 return pn_reactor_mark(self._impl)
119
120 - def _get_handler(self):
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
123 - def _set_handler(self, handler):
124 impl = _chandler(handler, self.on_error) 125 pn_reactor_set_handler(self._impl, impl) 126 pn_decref(impl)
127 128 handler = property(_get_handler, _set_handler) 129
130 - def run(self):
131 self.timeout = 3.14159265359 132 self.start() 133 while self.process(): pass 134 self.stop()
135
136 - def wakeup(self):
137 n = pn_reactor_wakeup(self._impl) 138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
140 - def start(self):
141 pn_reactor_start(self._impl)
142 143 @property
144 - def quiesced(self):
145 return pn_reactor_quiesced(self._impl)
146
147 - def _check_errors(self):
148 if self.errors: 149 for exc, value, tb in self.errors[:-1]: 150 traceback.print_exception(exc, value, tb) 151 exc, value, tb = self.errors[-1] 152 _compat.raise_(exc, value, tb)
153
154 - def process(self):
155 result = pn_reactor_process(self._impl) 156 self._check_errors() 157 return result
158
159 - def stop(self):
160 pn_reactor_stop(self._impl) 161 self._check_errors() 162 self.global_handler = None 163 self.handler = None
164
165 - def schedule(self, delay, task):
166 impl = _chandler(task, self.on_error) 167 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 168 pn_decref(impl) 169 return task
170
171 - def acceptor(self, host, port, handler=None):
172 impl = _chandler(handler, self.on_error) 173 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 174 pn_decref(impl) 175 if aimpl: 176 return Acceptor(aimpl) 177 else: 178 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
179
180 - def connection(self, handler=None):
181 """Deprecated: use connection_to_host() instead 182 """ 183 impl = _chandler(handler, self.on_error) 184 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 185 if impl: pn_decref(impl) 186 return result
187
188 - def connection_to_host(self, host, port, handler=None):
189 """Create an outgoing Connection that will be managed by the reactor. 190 The reator's pn_iohandler will create a socket connection to the host 191 once the connection is opened. 192 """ 193 conn = self.connection(handler) 194 self.set_connection_host(conn, host, port) 195 return conn
196
197 - def set_connection_host(self, connection, host, port):
198 """Change the address used by the connection. The address is 199 used by the reactor's iohandler to create an outgoing socket 200 connection. This must be set prior to opening the connection. 201 """ 202 pn_reactor_set_connection_host(self._impl, 203 connection._impl, 204 unicode2utf8(str(host)), 205 unicode2utf8(str(port)))
206
207 - def get_connection_address(self, connection):
208 """This may be used to retrieve the remote peer address. 209 @return: string containing the address in URL format or None if no 210 address is available. Use the proton.Url class to create a Url object 211 from the returned value. 212 """ 213 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 214 return utf82unicode(_url)
215
216 - def selectable(self, handler=None):
217 impl = _chandler(handler, self.on_error) 218 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 219 if impl: 220 record = pn_selectable_attachments(result._impl) 221 pn_record_set_handler(record, impl) 222 pn_decref(impl) 223 return result
224
225 - def update(self, sel):
226 pn_reactor_update(self._impl, sel._impl)
227
228 - def push_event(self, obj, etype):
229 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
230 231 from proton import wrappers as _wrappers 232 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 233 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
234 235 236 -class EventInjector(object):
237 """ 238 Can be added to a reactor to allow events to be triggered by an 239 external thread but handled on the event thread associated with 240 the reactor. An instance of this class can be passed to the 241 Reactor.selectable() method of the reactor in order to activate 242 it. The close() method should be called when it is no longer 243 needed, to allow the event loop to end if needed. 244 """
245 - def __init__(self):
246 self.queue = Queue.Queue() 247 self.pipe = os.pipe() 248 self._closed = False
249
250 - def trigger(self, event):
251 """ 252 Request that the given event be dispatched on the event thread 253 of the reactor to which this EventInjector was added. 254 """ 255 self.queue.put(event) 256 os.write(self.pipe[1], _compat.str2bin("!"))
257
258 - def close(self):
259 """ 260 Request that this EventInjector be closed. Existing events 261 will be dispctahed on the reactors event dispactch thread, 262 then this will be removed from the set of interest. 263 """ 264 self._closed = True 265 os.write(self.pipe[1], _compat.str2bin("!"))
266
267 - def fileno(self):
268 return self.pipe[0]
269
270 - def on_selectable_init(self, event):
271 sel = event.context 272 sel.fileno(self.fileno()) 273 sel.reading = True 274 event.reactor.update(sel)
275
276 - def on_selectable_readable(self, event):
277 os.read(self.pipe[0], 512) 278 while not self.queue.empty(): 279 requested = self.queue.get() 280 event.reactor.push_event(requested.context, requested.type) 281 if self._closed: 282 s = event.context 283 s.terminate() 284 event.reactor.update(s)
285
286 287 -class ApplicationEvent(EventBase):
288 """ 289 Application defined event, which can optionally be associated with 290 an engine object and or an arbitrary subject 291 """
292 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
293 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 294 self.connection = connection 295 self.session = session 296 self.link = link 297 self.delivery = delivery 298 if self.delivery: 299 self.link = self.delivery.link 300 if self.link: 301 self.session = self.link.session 302 if self.session: 303 self.connection = self.session.connection 304 self.subject = subject
305
306 - def __repr__(self):
307 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 308 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
309
310 -class Transaction(object):
311 """ 312 Class to track state of an AMQP 1.0 transaction. 313 """
314 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
315 self.txn_ctrl = txn_ctrl 316 self.handler = handler 317 self.id = None 318 self._declare = None 319 self._discharge = None 320 self.failed = False 321 self._pending = [] 322 self.settle_before_discharge = settle_before_discharge 323 self.declare()
324
325 - def commit(self):
326 self.discharge(False)
327
328 - def abort(self):
329 self.discharge(True)
330
331 - def declare(self):
332 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
333
334 - def discharge(self, failed):
335 self.failed = failed 336 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
337
338 - def _send_ctrl(self, descriptor, value):
339 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 340 delivery.transaction = self 341 return delivery
342
343 - def send(self, sender, msg, tag=None):
344 dlv = sender.send(msg, tag=tag) 345 dlv.local.data = [self.id] 346 dlv.update(0x34) 347 return dlv
348
349 - def accept(self, delivery):
350 self.update(delivery, PN_ACCEPTED) 351 if self.settle_before_discharge: 352 delivery.settle() 353 else: 354 self._pending.append(delivery)
355
356 - def update(self, delivery, state=None):
357 if state: 358 delivery.local.data = [self.id, Described(ulong(state), [])] 359 delivery.update(0x34)
360
361 - def _release_pending(self):
362 for d in self._pending: 363 d.update(Delivery.RELEASED) 364 d.settle() 365 self._clear_pending()
366
367 - def _clear_pending(self):
368 self._pending = []
369
370 - def handle_outcome(self, event):
371 if event.delivery == self._declare: 372 if event.delivery.remote.data: 373 self.id = event.delivery.remote.data[0] 374 self.handler.on_transaction_declared(event) 375 elif event.delivery.remote_state == Delivery.REJECTED: 376 self.handler.on_transaction_declare_failed(event) 377 else: 378 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 379 self.handler.on_transaction_declare_failed(event) 380 elif event.delivery == self._discharge: 381 if event.delivery.remote_state == Delivery.REJECTED: 382 if not self.failed: 383 self.handler.on_transaction_commit_failed(event) 384 self._release_pending() # make this optional? 385 else: 386 if self.failed: 387 self.handler.on_transaction_aborted(event) 388 self._release_pending() 389 else: 390 self.handler.on_transaction_committed(event) 391 self._clear_pending()
392
393 -class LinkOption(object):
394 """ 395 Abstract interface for link configuration options 396 """
397 - def apply(self, link):
398 """ 399 Subclasses will implement any configuration logic in this 400 method 401 """ 402 pass
403 - def test(self, link):
404 """ 405 Subclasses can override this to selectively apply an option 406 e.g. based on some link criteria 407 """ 408 return True
409
410 -class AtMostOnce(LinkOption):
411 - def apply(self, link):
413
414 -class AtLeastOnce(LinkOption):
415 - def apply(self, link):
418
419 -class SenderOption(LinkOption):
420 - def apply(self, sender): pass
421 - def test(self, link): return link.is_sender
422
423 -class ReceiverOption(LinkOption):
424 - def apply(self, receiver): pass
425 - def test(self, link): return link.is_receiver
426
427 -class DynamicNodeProperties(LinkOption):
428 - def __init__(self, props={}):
429 self.properties = {} 430 for k in props: 431 if isinstance(k, symbol): 432 self.properties[k] = props[k] 433 else: 434 self.properties[symbol(k)] = props[k]
435
436 - def apply(self, link):
441
442 -class Filter(ReceiverOption):
443 - def __init__(self, filter_set={}):
444 self.filter_set = filter_set
445
446 - def apply(self, receiver):
447 receiver.source.filter.put_dict(self.filter_set)
448
449 -class Selector(Filter):
450 """ 451 Configures a link with a message selector filter 452 """
453 - def __init__(self, value, name='selector'):
454 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
455
456 -class DurableSubscription(ReceiverOption):
457 - def apply(self, receiver):
460
461 -class Move(ReceiverOption):
462 - def apply(self, receiver):
464
465 -class Copy(ReceiverOption):
466 - def apply(self, receiver):
468 476
477 -def _create_session(connection, handler=None):
478 session = connection.session() 479 session.open() 480 return session
481
482 483 -def _get_attr(target, name):
484 if hasattr(target, name): 485 return getattr(target, name) 486 else: 487 return None
488
489 -class SessionPerConnection(object):
490 - def __init__(self):
491 self._default_session = None
492
493 - def session(self, connection):
494 if not self._default_session: 495 self._default_session = _create_session(connection) 496 self._default_session.context = self 497 return self._default_session
498
499 - def on_session_remote_close(self, event):
500 event.connection.close() 501 self._default_session = None
502
503 -class GlobalOverrides(object):
504 """ 505 Internal handler that triggers the necessary socket connect for an 506 opened connection. 507 """
508 - def __init__(self, base):
509 self.base = base
510
511 - def on_unhandled(self, name, event):
512 if not self._override(event): 513 event.dispatch(self.base)
514
515 - def _override(self, event):
516 conn = event.connection 517 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
518
519 -class Connector(Handler):
520 """ 521 Internal handler that triggers the necessary socket connect for an 522 opened connection. 523 """
524 - def __init__(self, connection):
525 self.connection = connection 526 self.address = None 527 self.heartbeat = None 528 self.reconnect = None 529 self.ssl_domain = None 530 self.allow_insecure_mechs = True 531 self.allowed_mechs = None 532 self.sasl_enabled = True 533 self.user = None 534 self.password = None 535 self.virtual_host = None
536
537 - def _connect(self, connection, reactor):
538 assert(reactor is not None) 539 url = self.address.next() 540 reactor.set_connection_host(connection, url.host, str(url.port)) 541 # if virtual-host not set, use host from address as default 542 if self.virtual_host is None: 543 connection.hostname = url.host 544 logging.debug("connecting to %s..." % url) 545 546 transport = Transport() 547 if self.sasl_enabled: 548 sasl = transport.sasl() 549 sasl.allow_insecure_mechs = self.allow_insecure_mechs 550 if url.username: 551 connection.user = url.username 552 elif self.user: 553 connection.user = self.user 554 if url.password: 555 connection.password = url.password 556 elif self.password: 557 connection.password = self.password 558 if self.allowed_mechs: 559 sasl.allowed_mechs(self.allowed_mechs) 560 transport.bind(connection) 561 if self.heartbeat: 562 transport.idle_timeout = self.heartbeat 563 if url.scheme == 'amqps': 564 if not self.ssl_domain: 565 raise SSLUnavailable("amqps: SSL libraries not found") 566 self.ssl = SSL(transport, self.ssl_domain) 567 self.ssl.peer_hostname = url.host
568
569 - def on_connection_local_open(self, event):
570 self._connect(event.connection, event.reactor)
571
572 - def on_connection_remote_open(self, event):
573 logging.debug("connected to %s" % event.connection.hostname) 574 if self.reconnect: 575 self.reconnect.reset() 576 self.transport = None
577
578 - def on_transport_tail_closed(self, event):
579 self.on_transport_closed(event)
580
581 - def on_transport_closed(self, event):
582 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 583 if self.reconnect: 584 event.transport.unbind() 585 delay = self.reconnect.next() 586 if delay == 0: 587 logging.info("Disconnected, reconnecting...") 588 self._connect(self.connection, event.reactor) 589 else: 590 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 591 event.reactor.schedule(delay, self) 592 else: 593 logging.debug("Disconnected") 594 self.connection = None
595
596 - def on_timer_task(self, event):
597 self._connect(self.connection, event.reactor)
598
599 - def on_connection_remote_close(self, event):
600 self.connection = None
601
602 -class Backoff(object):
603 """ 604 A reconnect strategy involving an increasing delay between 605 retries, up to a maximum or 10 seconds. 606 """
607 - def __init__(self):
608 self.delay = 0
609
610 - def reset(self):
611 self.delay = 0
612
613 - def next(self):
614 current = self.delay 615 if current == 0: 616 self.delay = 0.1 617 else: 618 self.delay = min(10, 2*current) 619 return current
620
621 -class Urls(object):
622 - def __init__(self, values):
623 self.values = [Url(v) for v in values] 624 self.i = iter(self.values)
625
626 - def __iter__(self):
627 return self
628
629 - def next(self):
630 try: 631 return next(self.i) 632 except StopIteration: 633 self.i = iter(self.values) 634 return next(self.i)
635
636 -class SSLConfig(object):
637 - def __init__(self):
638 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 639 self.server = SSLDomain(SSLDomain.MODE_SERVER)
640
641 - def set_credentials(self, cert_file, key_file, password):
642 self.client.set_credentials(cert_file, key_file, password) 643 self.server.set_credentials(cert_file, key_file, password)
644
645 - def set_trusted_ca_db(self, certificate_db):
646 self.client.set_trusted_ca_db(certificate_db) 647 self.server.set_trusted_ca_db(certificate_db)
648
649 650 -class Container(Reactor):
651 """A representation of the AMQP concept of a 'container', which 652 lossely speaking is something that establishes links to or from 653 another container, over which messages are transfered. This is 654 an extension to the Reactor class that adds convenience methods 655 for creating connections and sender- or receiver- links. 656 """
657 - def __init__(self, *handlers, **kwargs):
658 super(Container, self).__init__(*handlers, **kwargs) 659 if "impl" not in kwargs: 660 try: 661 self.ssl = SSLConfig() 662 except SSLUnavailable: 663 self.ssl = None 664 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 665 self.trigger = None 666 self.container_id = str(generate_uuid()) 667 self.allow_insecure_mechs = True 668 self.allowed_mechs = None 669 self.sasl_enabled = True 670 self.user = None 671 self.password = None 672 Wrapper.__setattr__(self, 'subclass', self.__class__)
673
674 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
675 """ 676 Initiates the establishment of an AMQP connection. Returns an 677 instance of proton.Connection. 678 679 @param url: URL string of process to connect to 680 681 @param urls: list of URL strings of process to try to connect to 682 683 Only one of url or urls should be specified. 684 685 @param reconnect: A value of False will prevent the library 686 form automatically trying to reconnect if the underlying 687 socket is disconnected before the connection has been closed. 688 689 @param heartbeat: A value in milliseconds indicating the 690 desired frequency of heartbeats used to test the underlying 691 socket is alive. 692 693 @param ssl_domain: SSL configuration in the form of an 694 instance of proton.SSLdomain. 695 696 @param handler: a connection scoped handler that will be 697 called to process any events in the scope of this connection 698 or its child links 699 700 @param kwargs: sasl_enabled, which determines whether a sasl layer is 701 used for the connection; allowed_mechs an optional list of SASL 702 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag 703 indicating whether insecure mechanisms, such as PLAIN over a 704 non-encrypted socket, are allowed; 'virtual_host' the hostname to set 705 in the Open performative used by peer to determine the correct 706 back-end service for the client. If 'virtual_host' is not supplied the 707 host field from the URL is used instead." 708 709 """ 710 conn = self.connection(handler) 711 conn.container = self.container_id or str(generate_uuid()) 712 conn.offered_capabilities = kwargs.get('offered_capabilities') 713 conn.desired_capabilities = kwargs.get('desired_capabilities') 714 conn.properties = kwargs.get('properties') 715 716 connector = Connector(conn) 717 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 718 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 719 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 720 connector.user = kwargs.get('user', self.user) 721 connector.password = kwargs.get('password', self.password) 722 connector.virtual_host = kwargs.get('virtual_host') 723 if connector.virtual_host: 724 # only set hostname if virtual-host is a non-empty string 725 conn.hostname = connector.virtual_host 726 727 conn._overrides = connector 728 if url: connector.address = Urls([url]) 729 elif urls: connector.address = Urls(urls) 730 elif address: connector.address = address 731 else: raise ValueError("One of url, urls or address required") 732 if heartbeat: 733 connector.heartbeat = heartbeat 734 if reconnect: 735 connector.reconnect = reconnect 736 elif reconnect is None: 737 connector.reconnect = Backoff() 738 # use container's default client domain if none specified. This is 739 # only necessary of the URL specifies the "amqps:" scheme 740 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 741 conn._session_policy = SessionPerConnection() #todo: make configurable 742 conn.open() 743 return conn
744
745 - def _get_id(self, container, remote, local):
746 if local and remote: "%s-%s-%s" % (container, remote, local) 747 elif local: return "%s-%s" % (container, local) 748 elif remote: return "%s-%s" % (container, remote) 749 else: return "%s-%s" % (container, str(generate_uuid()))
750
751 - def _get_session(self, context):
752 if isinstance(context, Url): 753 return self._get_session(self.connect(url=context)) 754 elif isinstance(context, Session): 755 return context 756 elif isinstance(context, Connection): 757 if hasattr(context, '_session_policy'): 758 return context._session_policy.session(context) 759 else: 760 return _create_session(context) 761 else: 762 return context.session()
763
764 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
765 """ 766 Initiates the establishment of a link over which messages can 767 be sent. Returns an instance of proton.Sender. 768 769 There are two patterns of use. (1) A connection can be passed 770 as the first argument, in which case the link is established 771 on that connection. In this case the target address can be 772 specified as the second argument (or as a keyword 773 argument). The source address can also be specified if 774 desired. (2) Alternatively a URL can be passed as the first 775 argument. In this case a new connection will be establised on 776 which the link will be attached. If a path is specified and 777 the target is not, then the path of the URL is used as the 778 target address. 779 780 The name of the link may be specified if desired, otherwise a 781 unique name will be generated. 782 783 Various LinkOptions can be specified to further control the 784 attachment. 785 """ 786 if isinstance(context, _compat.STRING_TYPES): 787 context = Url(context) 788 if isinstance(context, Url) and not target: 789 target = context.path 790 session = self._get_session(context) 791 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 792 if source: 793 snd.source.address = source 794 if target: 795 snd.target.address = target 796 if handler != None: 797 snd.handler = handler 798 if tags: 799 snd.tag_generator = tags 800 _apply_link_options(options, snd) 801 snd.open() 802 return snd
803
804 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
805 """ 806 Initiates the establishment of a link over which messages can 807 be received (aka a subscription). Returns an instance of 808 proton.Receiver. 809 810 There are two patterns of use. (1) A connection can be passed 811 as the first argument, in which case the link is established 812 on that connection. In this case the source address can be 813 specified as the second argument (or as a keyword 814 argument). The target address can also be specified if 815 desired. (2) Alternatively a URL can be passed as the first 816 argument. In this case a new connection will be establised on 817 which the link will be attached. If a path is specified and 818 the source is not, then the path of the URL is used as the 819 target address. 820 821 The name of the link may be specified if desired, otherwise a 822 unique name will be generated. 823 824 Various LinkOptions can be specified to further control the 825 attachment. 826 """ 827 if isinstance(context, _compat.STRING_TYPES): 828 context = Url(context) 829 if isinstance(context, Url) and not source: 830 source = context.path 831 session = self._get_session(context) 832 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 833 if source: 834 rcv.source.address = source 835 if dynamic: 836 rcv.source.dynamic = True 837 if target: 838 rcv.target.address = target 839 if handler != None: 840 rcv.handler = handler 841 _apply_link_options(options, rcv) 842 rcv.open() 843 return rcv
844
845 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
846 if not _get_attr(context, '_txn_ctrl'): 847 class InternalTransactionHandler(OutgoingMessageHandler): 848 def __init__(self): 849 super(InternalTransactionHandler, self).__init__(auto_settle=True)
850 851 def on_settled(self, event): 852 if hasattr(event.delivery, "transaction"): 853 event.transaction = event.delivery.transaction 854 event.delivery.transaction.handle_outcome(event)
855 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 856 context._txn_ctrl.target.type = Terminus.COORDINATOR 857 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 858 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 859
860 - def listen(self, url, ssl_domain=None):
861 """ 862 Initiates a server socket, accepting incoming AMQP connections 863 on the interface and port specified. 864 """ 865 url = Url(url) 866 acceptor = self.acceptor(url.host, url.port) 867 ssl_config = ssl_domain 868 if not ssl_config and url.scheme == 'amqps': 869 # use container's default server domain 870 if self.ssl: 871 ssl_config = self.ssl.server 872 else: 873 raise SSLUnavailable("amqps: SSL libraries not found") 874 if ssl_config: 875 acceptor.set_ssl_domain(ssl_config) 876 return acceptor
877
878 - def do_work(self, timeout=None):
879 if timeout: 880 self.timeout = timeout 881 return self.process()
882