1pub mod ai;
2pub mod api;
3pub mod auth;
4pub mod db;
5pub mod env;
6pub mod executor;
7mod rate_limiter;
8pub mod rpc;
9pub mod seed;
10
11#[cfg(test)]
12mod tests;
13
14use anyhow::anyhow;
15use aws_config::{BehaviorVersion, Region};
16use axum::{http::StatusCode, response::IntoResponse};
17use db::{ChannelId, Database};
18use executor::Executor;
19pub use rate_limiter::*;
20use serde::Deserialize;
21use std::{path::PathBuf, sync::Arc};
22use util::ResultExt;
23
24pub type Result<T, E = Error> = std::result::Result<T, E>;
25
26pub enum Error {
27 Http(StatusCode, String),
28 Database(sea_orm::error::DbErr),
29 Internal(anyhow::Error),
30}
31
32impl From<anyhow::Error> for Error {
33 fn from(error: anyhow::Error) -> Self {
34 Self::Internal(error)
35 }
36}
37
38impl From<sea_orm::error::DbErr> for Error {
39 fn from(error: sea_orm::error::DbErr) -> Self {
40 Self::Database(error)
41 }
42}
43
44impl From<axum::Error> for Error {
45 fn from(error: axum::Error) -> Self {
46 Self::Internal(error.into())
47 }
48}
49
50impl From<axum::http::Error> for Error {
51 fn from(error: axum::http::Error) -> Self {
52 Self::Internal(error.into())
53 }
54}
55
56impl From<serde_json::Error> for Error {
57 fn from(error: serde_json::Error) -> Self {
58 Self::Internal(error.into())
59 }
60}
61
62impl IntoResponse for Error {
63 fn into_response(self) -> axum::response::Response {
64 match self {
65 Error::Http(code, message) => {
66 log::error!("HTTP error {}: {}", code, &message);
67 (code, message).into_response()
68 }
69 Error::Database(error) => {
70 log::error!(
71 "HTTP error {}: {:?}",
72 StatusCode::INTERNAL_SERVER_ERROR,
73 &error
74 );
75 (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
76 }
77 Error::Internal(error) => {
78 log::error!(
79 "HTTP error {}: {:?}",
80 StatusCode::INTERNAL_SERVER_ERROR,
81 &error
82 );
83 (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
84 }
85 }
86 }
87}
88
89impl std::fmt::Debug for Error {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 Error::Http(code, message) => (code, message).fmt(f),
93 Error::Database(error) => error.fmt(f),
94 Error::Internal(error) => error.fmt(f),
95 }
96 }
97}
98
99impl std::fmt::Display for Error {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Error::Http(code, message) => write!(f, "{code}: {message}"),
103 Error::Database(error) => error.fmt(f),
104 Error::Internal(error) => error.fmt(f),
105 }
106 }
107}
108
109impl std::error::Error for Error {}
110
111#[derive(Deserialize)]
112pub struct Config {
113 pub http_port: u16,
114 pub database_url: String,
115 pub migrations_path: Option<PathBuf>,
116 pub seed_path: Option<PathBuf>,
117 pub database_max_connections: u32,
118 pub api_token: String,
119 pub clickhouse_url: Option<String>,
120 pub clickhouse_user: Option<String>,
121 pub clickhouse_password: Option<String>,
122 pub clickhouse_database: Option<String>,
123 pub invite_link_prefix: String,
124 pub live_kit_server: Option<String>,
125 pub live_kit_key: Option<String>,
126 pub live_kit_secret: Option<String>,
127 pub rust_log: Option<String>,
128 pub log_json: Option<bool>,
129 pub blob_store_url: Option<String>,
130 pub blob_store_region: Option<String>,
131 pub blob_store_access_key: Option<String>,
132 pub blob_store_secret_key: Option<String>,
133 pub blob_store_bucket: Option<String>,
134 pub zed_environment: Arc<str>,
135 pub openai_api_key: Option<Arc<str>>,
136 pub google_ai_api_key: Option<Arc<str>>,
137 pub anthropic_api_key: Option<Arc<str>>,
138 pub zed_client_checksum_seed: Option<String>,
139 pub slack_panics_webhook: Option<String>,
140 pub auto_join_channel_id: Option<ChannelId>,
141}
142
143impl Config {
144 pub fn is_development(&self) -> bool {
145 self.zed_environment == "development".into()
146 }
147}
148
149pub struct AppState {
150 pub db: Arc<Database>,
151 pub live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
152 pub blob_store_client: Option<aws_sdk_s3::Client>,
153 pub rate_limiter: Arc<RateLimiter>,
154 pub executor: Executor,
155 pub clickhouse_client: Option<clickhouse::Client>,
156 pub config: Config,
157}
158
159impl AppState {
160 pub async fn new(config: Config, executor: Executor) -> Result<Arc<Self>> {
161 let mut db_options = db::ConnectOptions::new(config.database_url.clone());
162 db_options.max_connections(config.database_max_connections);
163 let mut db = Database::new(db_options, Executor::Production).await?;
164 db.initialize_notification_kinds().await?;
165
166 let live_kit_client = if let Some(((server, key), secret)) = config
167 .live_kit_server
168 .as_ref()
169 .zip(config.live_kit_key.as_ref())
170 .zip(config.live_kit_secret.as_ref())
171 {
172 Some(Arc::new(live_kit_server::api::LiveKitClient::new(
173 server.clone(),
174 key.clone(),
175 secret.clone(),
176 )) as Arc<dyn live_kit_server::api::Client>)
177 } else {
178 None
179 };
180
181 let db = Arc::new(db);
182 let this = Self {
183 db: db.clone(),
184 live_kit_client,
185 blob_store_client: build_blob_store_client(&config).await.log_err(),
186 rate_limiter: Arc::new(RateLimiter::new(db)),
187 executor,
188 clickhouse_client: config
189 .clickhouse_url
190 .as_ref()
191 .and_then(|_| build_clickhouse_client(&config).log_err()),
192 config,
193 };
194 Ok(Arc::new(this))
195 }
196}
197
198async fn build_blob_store_client(config: &Config) -> anyhow::Result<aws_sdk_s3::Client> {
199 let keys = aws_sdk_s3::config::Credentials::new(
200 config
201 .blob_store_access_key
202 .clone()
203 .ok_or_else(|| anyhow!("missing blob_store_access_key"))?,
204 config
205 .blob_store_secret_key
206 .clone()
207 .ok_or_else(|| anyhow!("missing blob_store_secret_key"))?,
208 None,
209 None,
210 "env",
211 );
212
213 let s3_config = aws_config::defaults(BehaviorVersion::latest())
214 .endpoint_url(
215 config
216 .blob_store_url
217 .as_ref()
218 .ok_or_else(|| anyhow!("missing blob_store_url"))?,
219 )
220 .region(Region::new(
221 config
222 .blob_store_region
223 .clone()
224 .ok_or_else(|| anyhow!("missing blob_store_region"))?,
225 ))
226 .credentials_provider(keys)
227 .load()
228 .await;
229
230 Ok(aws_sdk_s3::Client::new(&s3_config))
231}
232
233fn build_clickhouse_client(config: &Config) -> anyhow::Result<clickhouse::Client> {
234 Ok(clickhouse::Client::default()
235 .with_url(
236 config
237 .clickhouse_url
238 .as_ref()
239 .ok_or_else(|| anyhow!("missing clickhouse_url"))?,
240 )
241 .with_user(
242 config
243 .clickhouse_user
244 .as_ref()
245 .ok_or_else(|| anyhow!("missing clickhouse_user"))?,
246 )
247 .with_password(
248 config
249 .clickhouse_password
250 .as_ref()
251 .ok_or_else(|| anyhow!("missing clickhouse_password"))?,
252 )
253 .with_database(
254 config
255 .clickhouse_database
256 .as_ref()
257 .ok_or_else(|| anyhow!("missing clickhouse_database"))?,
258 ))
259}