Developer Interface

This part of the documentation covers all the interfaces of aio_sf_streaming.

Code organization

The package is separated in 5 mains modules:

  • High-level classes and helpers are provided to have a quickly functional client. See Main Interface section.

  • The Base class section describe the low-level base class that implement the main client logic.

  • To authenticate on Salesforce, you must use one connector that add authentication capability to BaseSalesforceStreaming. See Connectors section for a list of available connectors.

  • Finally, Mixins extend BaseSalesforceStreaming capabilities and can be added easily as opt-in option by sub classing.

Main Interface

class aio_sf_streaming.SimpleSalesforceStreaming(username, password, client_id, client_secret, *, sandbox=False, version='42.0', loop=None, connector=None, login_connector=None, retry_sub_duration=0.1, retry_factor=1.0, retry_max_duration=30.0, retry_max_count=20)[source]

A simple helper class providing all-in-one functionalities.

Parameters
  • username (str) – User login name

  • password (str) – User password

  • client_id (str) – OAuth2 client Id

  • client_secret (str) – Oauth2 client secret

  • sandbox (bool) – If True, the connexion will be made on a sandbox, from https://test.salesforce.com instead of the main login route at https://login.salesforce.com.

  • version (str) – The API version to use. For example '42.0'.

  • loop (Optional[AbstractEventLoop]) – Asyncio loop used

  • connector (Optional[BaseConnector]) – aiohttp connector used for main session. Mainly used for test purpose.

  • login_connector (Optional[BaseConnector]) – aiohttp connector used during connection. Mainly used for test purpose.

  • retry_sub_duration (float) – Duration between subscribe retry if server is too buzy.

  • retry_factor (float) – Factor amplification between each successive retry

  • retry_max_duration (float) – Maximum value of the retry duration

  • retry_max_count (int) – Maximum count of retry, after this count is reach, response or exception are propagated.

Usage example:

class MyClient(SimpleSalesforceStreaming):
    def __init__(self):
        self.replays = []
        super().__init__(username='my-username',
                         password='my-password',
                         client_id='my-client-id',
                         client_secret='my-client-secret')

    async def store_replay_id(self, channel, replay_id, creation_time):
        # We only store replay id without any use
        self.replays.append((channel, replay_id, creation_time))

    async def get_last_replay_id(self, channel):
        # We ask for only use new events
        return EventType.NEW_EVENTS

async def print_events():
    async with MyClient() as client:
        await client.subscribe('/topic/Foo')
        async for message in client.events():
            channel = message['channel']
            print(f"Message received on {channel} : {message}")

loop = asyncio.get_event_loop()
loop.run_until_complete(print_event())

SimpleSalesforceStreaming inherit all members from their base class. Only main one, for external use, are listed here.

coroutine start(self)

See BaseSalesforceStreaming.start()

Return type

None

coroutine subscribe(self, channel)

See BaseSalesforceStreaming.subscribe()

Return type

List[Dict[str, Any]]

async-for messages(self)

See BaseSalesforceStreaming.messages()

Return type

Dict[str, Any]

async-for events(self)

Asynchronous generator that fetch new events and return one as soon as one is available:

async for message in client.events():
    channel = message['channel']
    print(channel, ':', message)

This method is different from BaseSalesforceStreaming.messages() because it filter messages and provide only those related to the channels you subscribed.

Return type

Dict[str, Any]

coroutine store_replay_id(self, channel, replay_id, creation_time)

Callback called to store a replay id. You should override this method to implement your custom logic.

Parameters
  • channel (str) – Channel name

  • replay_id (int) – replay id to store

  • creation_time (str) – Creation time. You should store only the last created object but you can not know if you received event in order without this. This value is the string provided by SF.

Return type

None

coroutine get_last_replay_id(self, channel)

Callback called to retrieve a replay id. You should override this method to implement your custom logic.

Parameters

channel (str) – Channel name

Return type

Union[ReplayType, int]

coroutine ask_stop(self)

Ask client to stop receiving event:

async for event in client.events():
    ...
    if ...:
        await client.ask_stop()

This call will eventually stop BaseSalesforceStreaming.messages() and BaseSalesforceStreaming.events() async generator but this can take some time if not called inside the loop body: the generator will wait a timeout response from Salesforce server.

Return type

None

coroutine unsubscribe(self, channel)

See BaseSalesforceStreaming.unsubscribe()

Return type

List[Dict[str, Any]]

coroutine stop(self)

See BaseSalesforceStreaming.stop()

Return type

None

class aio_sf_streaming.SimpleRefreshTokenSalesforceStreaming(refresh_token, client_id, client_secret, *, sandbox=False, version='42.0', loop=None, connector=None, login_connector=None, retry_sub_duration=0.1, retry_factor=1.0, retry_max_duration=30.0, retry_max_count=20)[source]

A simple helper class providing all-in-one functionalities.

Parameters
  • refresh_token (str) – Refresh token

  • client_id (str) – OAuth2 client Id

  • client_secret (str) – Oauth2 client secret

  • sandbox (bool) – If True, the connexion will be made on a sandbox, from https://test.salesforce.com instead of the main login route at https://login.salesforce.com.

  • version (str) – The API version to use. For example '42.0'.

  • loop (Optional[AbstractEventLoop]) – Asyncio loop used

  • connector (Optional[BaseConnector]) – aiohttp connector used for main session. Mainly used for test purpose.

  • login_connector (Optional[BaseConnector]) – aiohttp connector used during connection. Mainly used for test purpose.

  • retry_sub_duration (float) – Duration between subscribe retry if server is too buzy.

  • retry_factor (float) – Factor amplification between each successive retry

  • retry_max_duration (float) – Maximum value of the retry duration

  • retry_max_count (int) – Maximum count of retry, after this count is reach, response or exception are propagated.

Usage example:

class MyClient(SimpleRefreshTokenSalesforceStreaming):
    def __init__(self):
        self.replays = []
        super().__init__(refresh_token='refresh_token',
                         client_id='my-client-id',
                         client_secret='my-client-secret')

    async def store_replay_id(self, channel, replay_id, creation_time):
        # We only store replay id without any use
        self.replays.append((channel, replay_id, creation_time))

    async def get_last_replay_id(self, channel):
        # We ask for only use new events
        return EventType.NEW_EVENTS

async def print_events():
    async with MyClient() as client:
        await client.subscribe('/topic/Foo')
        async for message in client.events():
            channel = message['channel']
            print(f"Message received on {channel} : {message}")

loop = asyncio.get_event_loop()
loop.run_until_complete(print_event())

SimpleSalesforceStreaming inherit all members from their base class. Only main one, for external use, are listed here. See SimpleRefreshTokenSalesforceStreaming for method description.

Base class

class aio_sf_streaming.BaseSalesforceStreaming(*, sandbox=False, version='42.0', loop=None, connector=None)[source]

Base low-level aio-sf-streaming class.

Can not be used directly: must be sub-classed with at least one connector implementation. The class provide basic functionalities. Additional functionalities can be added with provided mixins.

The main logic is implemented here but you should not use it directly.

Parameters
  • sandbox (bool) – If True, the connexion will be made on a sandbox, from https://test.salesforce.com instead of the main login route at https://login.salesforce.com.

  • version (str) – The API version to use. For example '42.0'.

  • loop (Optional[AbstractEventLoop]) – Asyncio loop used

  • connector (Optional[BaseConnector]) – aiohttp connector used for main session. Mainly used for test purpose.

This class supports the context manager protocol for self closing.

All main members are coroutine, even if default implementation does do any asynchronous call. With this convention, sub classes and mixins can easily override this members and do complex call.

See SimpleSalesforceStreaming for an usage example.

High level api

coroutine start(self)[source]

Connect to Salesforce, authenticate and init CometD connexion.

A best practice is to use async context manager interface that will call this method directly.

Return type

None

coroutine subscribe(self, channel)[source]

Subscribe to a channel. Can be used directly:

await client.subscribe('/topic/Foo')

This method, and the underlying protocol, are safe to be started as an background task:

loop.create_task(client.subscribe('/topic/Foo'))
Return type

List[Dict[str, Any]]

async-for messages(self)[source]

Asynchronous generator that fetch new messages and return one as soon as one is available:

async for message in client.messages():
    channel = message['channel']
    print(channel, ':', message)

This method iterate over all messages, even on internal/meta one. If you want to only iterate over messages from channels you subscribed, you should use BaseSalesforceStreaming.events().

Warning

Linked to the underlying protocol, long-pooling based, the client should reconnect as soon as possible. Practically, client have 40 seconds to reconnect. If your processing take a longer time, a new connection should be made. You should avoid doing long processing between each iteration or launch this processing into a background task.

Return type

Dict[str, Any]

async-for events(self)[source]

Asynchronous generator that fetch new events and return one as soon as one is available:

async for message in client.events():
    channel = message['channel']
    print(channel, ':', message)

This method is different from BaseSalesforceStreaming.messages() because it filter messages and provide only those related to the channels you subscribed.

Return type

Dict[str, Any]

coroutine ask_stop(self)[source]

Ask client to stop receiving event:

async for event in client.events():
    ...
    if ...:
        await client.ask_stop()

This call will eventually stop BaseSalesforceStreaming.messages() and BaseSalesforceStreaming.events() async generator but this can take some time if not called inside the loop body: the generator will wait a timeout response from Salesforce server.

Return type

None

coroutine unsubscribe(self, channel)[source]

Unsubscribe to a channel. Can be used directly:

await client.unsubscribe('/topic/Foo')

This method, and the underlying protocol, are safe to be started as an background task:

loop.create_task(client.unsubscribe('/topic/Foo'))
Return type

List[Dict[str, Any]]

coroutine stop(self)[source]

Disconnect to Salesforce and close underlying connection.

A best practice is to use async context manager interface that will call this method directly.

Return type

None

Connection logic

token_url

The url that should be used to fetch an access token.

Return type

str

coroutine fetch_token(self)[source]

Abstract coroutine method of connector that must provide an access token and the instance url linked.

Return type

Tuple[str, str]

coroutine create_connected_session(self)[source]

This coroutine create an aiohttp.ClientSession using fetched token

Return type

ClientSession

coroutine close_session(self)[source]

Close the underlying aiohttp.ClientSession connection

Return type

None

Bayeux/CometD logic layer

end_point

Cometd endpoint

Return type

str

coroutine get_handshake_payload(self)[source]

Provide the handshake payload

Return type

Dict[str, Any]

coroutine get_subscribe_payload(self, channel)[source]

Provide the subscription payload for a specific channel

Return type

Dict[str, Any]

coroutine get_unsubscribe_payload(self, channel)[source]

Provide the unsubscription payload for a specific channel

Return type

Dict[str, Any]

coroutine send(self, data)[source]

Send data to CometD server when the connection is established:

# Manually disconnect
await client.send({'channel': '/meta/disconnect'})
Return type

Union[Dict[str, Any], List[Dict[str, Any]]]

coroutine handshake(self)[source]

Coroutine that perform an handshake (mandatory before any other action)

Return type

List[Dict[str, Any]]

coroutine disconnect(self)[source]

Disconnect from the SF streaming server

Return type

List[Dict[str, Any]]

I/O layer helpers

coroutine get(self, sub_url, **kwargs)[source]

Perform a simple json get request from an internal url:

response = await.client.get('/myendpoint/')
Return type

Union[Dict[str, Any], List[Dict[str, Any]]]

coroutine post(self, sub_url, **kwargs)[source]

Perform a simple json post request from an internal url:

response = await.client.post('/myendpoint/', json={'data': 'foo'})
Return type

Union[Dict[str, Any], List[Dict[str, Any]]]

coroutine request(self, method, sub_url, **kwargs)[source]

Perform a simple json request from an internal url

Return type

Union[Dict[str, Any], List[Dict[str, Any]]]

Other attributes

loop

Running event loop

Return type

AbstractEventLoop

Connectors

class aio_sf_streaming.BaseConnector(*, client_id=None, client_secret=None, login_connector=None, **kwargs)[source]

Base class for all sf connectors.

Parameters
  • client_id (Optional[str]) – OAuth2 client Id (mandatory)

  • client_secret (Optional[str]) – Oauth2 client secret (mandatory)

  • login_connector (Optional[BaseConnector]) – aiohttp connector used during connection. Mainly used for test purpose.

See BaseSalesforceStreaming for other keywords arguments.

class aio_sf_streaming.PasswordSalesforceStreaming(*, username=None, password=None, **kwargs)[source]

Create a SF streaming manager with password flow connection.

Main arguments are connection credentials:

Parameters
  • username (Optional[str]) – User login name

  • password (Optional[str]) – User password

See BaseConnector for other keywords arguments.

class aio_sf_streaming.RefreshTokenSalesforceStreaming(*, refresh_token=None, **kwargs)[source]

Create a SF streaming manager with password refresh token connection.

Main arguments are connection credentials:

Parameters

refresh_token (Optional[str]) – Refresh token

See BaseConnector for other keywords arguments.

Mixins

class aio_sf_streaming.AllMixin(*args, **kwargs)[source]

Helper class to add all mixin with one class

class aio_sf_streaming.TimeoutAdviceMixin[source]

Simple mixin that automatically set timeout setting according to SF advice, if provided.

class aio_sf_streaming.ReplayType(value)[source]

Enumeration with special replay values

ALL_EVENTS = -2

Replay all events available.

NEW_EVENTS = -1

No replay, retrieve only new events.

class aio_sf_streaming.ReplayMixin[source]

Mixing adding replay support to the streaming client.

This mixin is not enough, you must implement ReplayMixin.store_replay_id() and :py:func:`ReplayMixin.get_last_replay_id in a subclass in order to have a working replay.

coroutine get_last_replay_id(self, channel)[source]

Callback called to retrieve a replay id. You should override this method to implement your custom logic.

Parameters

channel (str) – Channel name

Return type

Union[ReplayType, int]

coroutine store_replay_id(self, channel, replay_id, creation_time)[source]

Callback called to store a replay id. You should override this method to implement your custom logic.

Parameters
  • channel (str) – Channel name

  • replay_id (int) – replay id to store

  • creation_time (str) – Creation time. You should store only the last created object but you can not know if you received event in order without this. This value is the string provided by SF.

Return type

None

class aio_sf_streaming.AutoVersionMixin[source]

Simple mixin that fetch last api version before connect.

class aio_sf_streaming.AutoReconnectMixin(*args, **kwargs)[source]

Mixin that will automatically reconnect when asked by Salesforce

class aio_sf_streaming.ReSubscribeMixin(retry_sub_duration=0.1, retry_factor=1.0, retry_max_duration=30.0, retry_max_count=20, **kwargs)[source]

Mixin that handle subscription error, will try again after a short delay

Parameters
  • retry_sub_duration (float) – Duration between subscribe retry if server is too buzy (initial value).

  • retry_factor (float) – Factor amplification between each successive retry

  • retry_max_duration (float) – Maximum value of the retry duration

  • retry_max_count (int) – Maximum count of retry, after this count is reach, response or exception are propagated.

coroutine should_retry_on_error_response(self, channel, response)[source]

Callback called to process a response with and error message. Return a boolean if we must retry. If False is returned, the response will be returned to caller.

By-default, retry on known ‘server unavailable’ response.

Parameters
  • channel (str) – Channel name

  • response (Dict[str, Any]) – The response received

Return type

bool

coroutine should_retry_on_exception(self, channel, exception)[source]

Callback called to process an exception raised during subscription. Return a boolean if we must retry. If False is returned, the exception will be propagated to caller.

By-default, do return always False.

Parameters
  • channel (str) – Channel name

  • exception (Exception) – The exception raised

Return type

bool