1use crate::Channel;
2use anyhow::Result;
3use client::{Client, Collaborator, UserStore};
4use collections::HashMap;
5use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
6use rpc::{
7 proto::{self, PeerId},
8 TypedEnvelope,
9};
10use std::sync::Arc;
11use util::ResultExt;
12
13pub(crate) fn init(client: &Arc<Client>) {
14 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
15 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
16}
17
18pub struct ChannelBuffer {
19 pub(crate) channel: Arc<Channel>,
20 connected: bool,
21 collaborators: HashMap<PeerId, Collaborator>,
22 user_store: ModelHandle<UserStore>,
23 buffer: ModelHandle<language::Buffer>,
24 buffer_epoch: u64,
25 client: Arc<Client>,
26 subscription: Option<client::Subscription>,
27}
28
29pub enum ChannelBufferEvent {
30 CollaboratorsChanged,
31 Disconnected,
32}
33
34impl Entity for ChannelBuffer {
35 type Event = ChannelBufferEvent;
36
37 fn release(&mut self, _: &mut AppContext) {
38 if self.connected {
39 self.client
40 .send(proto::LeaveChannelBuffer {
41 channel_id: self.channel.id,
42 })
43 .log_err();
44 }
45 }
46}
47
48impl ChannelBuffer {
49 pub(crate) async fn new(
50 channel: Arc<Channel>,
51 client: Arc<Client>,
52 user_store: ModelHandle<UserStore>,
53 mut cx: AsyncAppContext,
54 ) -> Result<ModelHandle<Self>> {
55 let response = client
56 .request(proto::JoinChannelBuffer {
57 channel_id: channel.id,
58 })
59 .await?;
60
61 let base_text = response.base_text;
62 let operations = response
63 .operations
64 .into_iter()
65 .map(language::proto::deserialize_operation)
66 .collect::<Result<Vec<_>, _>>()?;
67
68 let buffer = cx.add_model(|_| {
69 language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
70 });
71 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
72
73 let subscription = client.subscribe_to_entity(channel.id)?;
74
75 anyhow::Ok(cx.add_model(|cx| {
76 cx.subscribe(&buffer, Self::on_buffer_update).detach();
77
78 let mut this = Self {
79 buffer,
80 buffer_epoch: response.epoch,
81 client,
82 connected: true,
83 collaborators: Default::default(),
84 channel,
85 subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
86 user_store,
87 };
88 this.replace_collaborators(response.collaborators, cx);
89 this
90 }))
91 }
92
93 pub fn user_store(&self) -> &ModelHandle<UserStore> {
94 &self.user_store
95 }
96
97 pub(crate) fn replace_collaborators(
98 &mut self,
99 collaborators: Vec<proto::Collaborator>,
100 cx: &mut ModelContext<Self>,
101 ) {
102 let mut new_collaborators = HashMap::default();
103 for collaborator in collaborators {
104 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
105 new_collaborators.insert(collaborator.peer_id, collaborator);
106 }
107 }
108
109 for (_, old_collaborator) in &self.collaborators {
110 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
111 self.buffer.update(cx, |buffer, cx| {
112 buffer.remove_peer(old_collaborator.replica_id as u16, cx)
113 });
114 }
115 }
116 self.collaborators = new_collaborators;
117 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
118 cx.notify();
119 }
120
121 async fn handle_update_channel_buffer(
122 this: ModelHandle<Self>,
123 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
124 _: Arc<Client>,
125 mut cx: AsyncAppContext,
126 ) -> Result<()> {
127 let ops = update_channel_buffer
128 .payload
129 .operations
130 .into_iter()
131 .map(language::proto::deserialize_operation)
132 .collect::<Result<Vec<_>, _>>()?;
133
134 this.update(&mut cx, |this, cx| {
135 cx.notify();
136 this.buffer
137 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
138 })?;
139
140 Ok(())
141 }
142
143 async fn handle_update_channel_buffer_collaborators(
144 this: ModelHandle<Self>,
145 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
146 _: Arc<Client>,
147 mut cx: AsyncAppContext,
148 ) -> Result<()> {
149 this.update(&mut cx, |this, cx| {
150 this.replace_collaborators(message.payload.collaborators, cx);
151 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
152 cx.notify();
153 });
154
155 Ok(())
156 }
157
158 fn on_buffer_update(
159 &mut self,
160 _: ModelHandle<language::Buffer>,
161 event: &language::Event,
162 _: &mut ModelContext<Self>,
163 ) {
164 if let language::Event::Operation(operation) = event {
165 let operation = language::proto::serialize_operation(operation);
166 self.client
167 .send(proto::UpdateChannelBuffer {
168 channel_id: self.channel.id,
169 operations: vec![operation],
170 })
171 .log_err();
172 }
173 }
174
175 pub fn epoch(&self) -> u64 {
176 self.buffer_epoch
177 }
178
179 pub fn buffer(&self) -> ModelHandle<language::Buffer> {
180 self.buffer.clone()
181 }
182
183 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
184 &self.collaborators
185 }
186
187 pub fn channel(&self) -> Arc<Channel> {
188 self.channel.clone()
189 }
190
191 pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
192 log::info!("channel buffer {} disconnected", self.channel.id);
193 if self.connected {
194 self.connected = false;
195 self.subscription.take();
196 cx.emit(ChannelBufferEvent::Disconnected);
197 cx.notify()
198 }
199 }
200
201 pub fn is_connected(&self) -> bool {
202 self.connected
203 }
204
205 pub fn replica_id(&self, cx: &AppContext) -> u16 {
206 self.buffer.read(cx).replica_id()
207 }
208}