Quickstart¶
Code organization¶
aio-sf-streaming is designed in a modular way:
BaseSalesforceStreaming
is the base class of the package. It implement the main logic of the streaming API flow. It is an abstract class, you can not use it directly, the class lake of connection capability. You must use one of the connector implementation.Connectors add connection capabilities to
BaseSalesforceStreaming
allowing to connect to Salesforce.PasswordSalesforceStreaming
allow to connect on Salesforce with password flow.RefreshTokenSalesforceStreaming
allow to connect on Salesforce with refresh token flow.Mixins are provided and can be added to concrete implementation to provide additional capabilities like replay support or use the provided timeout advice. This functionalities can be added by sub-classing a connector and add mixin to your concrete implementation.
Helper class like
SimpleSalesforceStreaming
simplify implementation with an “all-in-one” class implementation.
Asyncronous and Asyncio¶
Salesforce connection¶
BaseSalesforceStreaming
allow you to connect with user name and
password of the user and client id and secret from the connected app.
Constructor does not establish any connection, you needs to call
BaseSalesforceStreaming.start()
to connect to Salesforce and
start Bayeux/CometD protocol. Call :BaseSalesforceStreaming.stop()
to disconnect and stop connection.
client = SimpleSalesforceStreaming(
username='username',
password='password',
client_id='client_id',
client_secret='client_secret')
await client.start()
# process events
await client.stop()
Most of the time, you should not call theses methods directly, you should use the asynchronous context manager interface that will call all of these for you:
async with SimpleSalesforceStreaming(
username='username',
password='password',
client_id='client_id',
client_secret='client_secret') as client:
# process events
Subscribe to events¶
Two methods BaseSalesforceStreaming.subscribe()
and BaseSalesforceStreaming.unsubscribe()
allow you to start receiving new events from a push topic or a generic streaming event
and stop when you does not want to receive event anymore.
async with SimpleSalesforceStreaming(**credentials) as client:
# Subscribe to push topic
await client.subscribe('/topic/Foo')
# Subscribe to generic event
await client.subscribe('/u/MyEvent')
# Process events
# Unsubscribe from push topic
await client.unsubscribe('/topic/Foo')
# Unsubscribe from generic event
await client.unsubscribe('/u/MyEvent')
You can subscribe and unsubscribe at any moment and on other coroutine as soon as the connection is established. You can even start to process without waiting the response:
async def process(loop):
async with SimpleSalesforceStreaming(**credentials, loop=loop) as client:
loop.create_task(client.subscribe('/topic/Foo'))
loop.create_task(client.subscribe('/topic/Bar'))
# Process events
loop = asyncio.get_event_loop()
loop.run_until_complete(process(loop))
Receive events¶
BaseSalesforceStreaming.messages()
and BaseSalesforceStreaming.events()
are used to iterate over events when their are received. The main difference is
that BaseSalesforceStreaming.messages()
provide all events, whereas
BaseSalesforceStreaming.events()
filter internal messages and provide
only the events for channel you subscribed.
Both methods are asynchronous generator and should be iterate with async for:
async with SimpleSalesforceStreaming(**credentials) as client:
await client.subscribe('/topic/Foo')
await client.subscribe('/topic/Bar')
async for event in client.events():
channel = event['channel']
print(f"Received an event from {channel} : {event}")
Warning
Linked to the underlying protocol, long-pooling based, the client should reconnect as soon as possible. Practically, client have 40 seconds to reconnect. If your processing take a longer time, a new connection should be made. You should avoid doing long processing between each iteration or launch this processing into a background task.
The processing loop is infinite by default. Inside the loop, you can stop easily with a break:
async with SimpleSalesforceStreaming(**credentials) as client:
await client.subscribe('/topic/Foo')
await client.subscribe('/topic/Bar')
async for event in client.events():
channel = event['channel']
if channel == '/topic/Foo':
break
else:
print(event)
Outside the main loop, you can call BaseSalesforceStreaming.ask_stop()
to stop the loop as soon as is possible, even if your loop is waiting for a new
message. Please note that, due to the underlying protocol, this can take some
time to really happen (the code must wait a timeout from the server, can be as
long as 2min).
Replay support¶
ReplayMixin
add support of 24 hours events replay.
Each event is associated with an unique id by channel. To support replay, you
must override two methods: ReplayMixin.store_replay_id()
and ReplayMixin.get_last_replay_id()
.
ReplayMixin.store_replay_id()
is called for each received event. The
method is called with three arguments:
the channel,
the replay id,
the object creation time (the string provided by SF).
For each channel, this function should store the replay id of the last created object.
ReplayMixin.get_last_replay_id()
will be called to retrieve the last
replay id for a specific channel. In addition of a specific id, this function
can return two special values from the ReplayType
enum to replay
all available events (24 hours history) or only new events after subscription.
The next example will store replay id in memory. In real world application you should store this id in a persistent way:
class MyClient(SimpleSalesforceStreaming):
def __init__(*args, **kwargs):
self.replays = {}
super().__init__(*args, **kwargs)
async def store_replay_id(self, channel, replay_id, creation_time):
# we does not want to store the replay id if a most recent one is
# already stored
last_storage = self.replays.get(channel, None)
creation_time = parse_time(creation_time) # Custom function to implement
if last_storage and last_storage[0] > creation_time:
return
self.replays[channel] = (creation_time, replay_id)
async def get_last_replay_id(self, channel):
# Retrieve last replay
last_storage = self.replays.get(channel, None)
# If we have not any stored replay id, we can either replay all
# events or only subscribe to new ones.
if not last_storage:
return ReplayType.NEW_EVENTS
return last_storage[1]