Code organization

aio-sf-streaming is designed in a modular way:

Asyncronous and Asyncio

Salesforce connection

BaseSalesforceStreaming allow you to connect with user name and password of the user and client id and secret from the connected app.

Constructor does not establish any connection, you needs to call BaseSalesforceStreaming.start() to connect to Salesforce and start Bayeux/CometD protocol. Call :BaseSalesforceStreaming.stop() to disconnect and stop connection.

client = SimpleSalesforceStreaming(
await client.start()
# process events
await client.stop()

Most of the time, you should not call theses methods directly, you should use the asynchronous context manager interface that will call all of these for you:

async with SimpleSalesforceStreaming(
                client_secret='client_secret') as client:
    # process events

Subscribe to events

Two methods BaseSalesforceStreaming.subscribe() and BaseSalesforceStreaming.unsubscribe() allow you to start receiving new events from a push topic or a generic streaming event and stop when you does not want to receive event anymore.

async with SimpleSalesforceStreaming(**credentials) as client:
    # Subscribe to push topic
    await client.subscribe('/topic/Foo')
    # Subscribe to generic event
    await client.subscribe('/u/MyEvent')

    # Process events

    # Unsubscribe from push topic
    await client.unsubscribe('/topic/Foo')
    # Unsubscribe from generic event
    await client.unsubscribe('/u/MyEvent')

You can subscribe and unsubscribe at any moment and on other coroutine as soon as the connection is established. You can even start to process without waiting the response:

async def process(loop):
    async with SimpleSalesforceStreaming(**credentials, loop=loop) as client:

        # Process events

loop = asyncio.get_event_loop()

Receive events

BaseSalesforceStreaming.messages() and BaseSalesforceStreaming.events() are used to iterate over events when their are received. The main difference is that BaseSalesforceStreaming.messages() provide all events, whereas BaseSalesforceStreaming.events() filter internal messages and provide only the events for channel you subscribed.

Both methods are asynchronous generator and should be iterate with async for:

async with SimpleSalesforceStreaming(**credentials) as client:
    await client.subscribe('/topic/Foo')
    await client.subscribe('/topic/Bar')

    async for event in client.events():
        channel = event['channel']
        print(f"Received an event from {channel} : {event}")


Linked to the underlying protocol, long-pooling based, the client should reconnect as soon as possible. Practically, client have 40 seconds to reconnect. If your processing take a longer time, a new connection should be made. You should avoid doing long processing between each iteration or launch this processing into a background task.

The processing loop is infinite by default. Inside the loop, you can stop easily with a break:

async with SimpleSalesforceStreaming(**credentials) as client:
    await client.subscribe('/topic/Foo')
    await client.subscribe('/topic/Bar')

    async for event in client.events():
        channel = event['channel']
        if channel == '/topic/Foo':

Outside the main loop, you can call BaseSalesforceStreaming.ask_stop() to stop the loop as soon as is possible, even if your loop is waiting for a new message. Please note that, due to the underlying protocol, this can take some time to really happen (the code must wait a timeout from the server, can be as long as 2min).

Replay support

ReplayMixin add support of 24 hours events replay. Each event is associated with an unique id by channel. To support replay, you must override two methods: ReplayMixin.store_replay_id() and ReplayMixin.get_last_replay_id().

ReplayMixin.store_replay_id() is called for each received event. The method is called with three arguments:

  • the channel,

  • the replay id,

  • the object creation time (the string provided by SF).

For each channel, this function should store the replay id of the last created object.

ReplayMixin.get_last_replay_id() will be called to retrieve the last replay id for a specific channel. In addition of a specific id, this function can return two special values from the ReplayType enum to replay all available events (24 hours history) or only new events after subscription.

The next example will store replay id in memory. In real world application you should store this id in a persistent way:

class MyClient(SimpleSalesforceStreaming):
    def __init__(*args, **kwargs):
        self.replays = {}
        super().__init__(*args, **kwargs)

    async def store_replay_id(self, channel, replay_id, creation_time):
        # we does not want to store the replay id if a most recent one is
        # already stored
        last_storage = self.replays.get(channel, None)
        creation_time = parse_time(creation_time)       # Custom function to implement
        if last_storage and last_storage[0] > creation_time:
        self.replays[channel] = (creation_time, replay_id)

    async def get_last_replay_id(self, channel):
        # Retrieve last replay
        last_storage = self.replays.get(channel, None)
        # If we have not any stored replay id, we can either replay all
        # events or only subscribe to new ones.
        if not last_storage:
            return ReplayType.NEW_EVENTS
        return last_storage[1]