I am using Firebase. There is a fixed limit of 100 snapshot streams per client.
I have written a lot of database handlers that each return a stream of snapshots from Firebase Firestore. Over time, this will definitely hit more than 100 outgoing streams at once. I want to avoid this from happening.
Currently, I am looking to create a fixed pool of streams. When it hits 100 streams, any further attempts will error. But if you want to listen to a stream that has already been created, you can do so without creating a new stream instance.
Basically, the logic is:
- If stream already exists in stream pool, return that broadcast stream
- Otherwise, if the pool is full, error "pool is full"
- Otherwise, create the stream, put it into the pool, and return
And whenever the stream loses all of it's listeners, it should automatically remove itself from the stream pool.
This is my current attempt, but I can't seem to figure it all out.
class StreamPool {
/// This is the maximum number of streams that can hold
/// a connection to Firebase, per client.
static const int maxStreams = 100;
final HashMap<String, StreamController> _streams = new HashMap();
Stream<T> streamOrFail<T>(String key, Stream<T> Function() builder) {
assert(key != null);
assert(builder != null);
// If we're already listening to this query, return that instance
if (_streams.containsKey(key)) {
return _streams[key].stream;
}
// If the pool is full, error
if (_streams.length >= maxStreams) {
throw new Exception("stream pool is full");
}
// Otherwise create a new stream
var controller = StreamController.broadcast(
onCancel: () {
_streams[key].close();
_streams.remove(key);
},
);
controller.addStream(builder());
_streams[key] = controller;
return controller.stream;
}
}
The streamOrFail(key, builder)
method is used to define methods that fetch data from Firebase. For example, it could be used like:
Stream<DocumentSnapshot> streamUser(String userID) =>
pool.streamOrFail("stream_user", () => FirebaseFirestore.instance
.collection("users")
.doc(userID)
.snapshots());
Each "query" would be uniquely identified by the key. In this case, stream_user
. This is what allows us to determine if we're already listening to that same query.
And finally, this would be used in something like a StreamBuilder
:
StreamBuilder(
stream: api.streamUser("my-id"),
builder: (context, userSnapshot) {
// ...
}
);
And if it failed, then userSnapshot.hasError
would be true, and I could handle that somehow.
However, I am concerned about using onCancel to automatically remove the stream when it doesn't have any more listeners. The stream controller is added to the HashMap before anything listens to it, so I'm worried that this will fire early.
Does anyone have any thoughts? Is this a good/working solution, or am I avoiding this problem in the complete wrong way?
question from:
https://stackoverflow.com/questions/66058572/flutter-firebase-limit-number-of-snapshot-stream-listeners-per-client