1pub mod ai;
2pub mod api;
3pub mod auth;
4pub mod db;
5pub mod env;
6pub mod executor;
7mod rate_limiter;
8pub mod rpc;
9
10#[cfg(test)]
11mod tests;
12
13use anyhow::anyhow;
14use aws_config::{BehaviorVersion, Region};
15use axum::{http::StatusCode, response::IntoResponse};
16use db::{ChannelId, Database};
17use executor::Executor;
18pub use rate_limiter::*;
19use serde::Deserialize;
20use std::{path::PathBuf, sync::Arc};
21use util::ResultExt;
22
23pub type Result<T, E = Error> = std::result::Result<T, E>;
24
25pub enum Error {
26 Http(StatusCode, String),
27 Database(sea_orm::error::DbErr),
28 Internal(anyhow::Error),
29}
30
31impl From<anyhow::Error> for Error {
32 fn from(error: anyhow::Error) -> Self {
33 Self::Internal(error)
34 }
35}
36
37impl From<sea_orm::error::DbErr> for Error {
38 fn from(error: sea_orm::error::DbErr) -> Self {
39 Self::Database(error)
40 }
41}
42
43impl From<axum::Error> for Error {
44 fn from(error: axum::Error) -> Self {
45 Self::Internal(error.into())
46 }
47}
48
49impl From<axum::http::Error> for Error {
50 fn from(error: axum::http::Error) -> Self {
51 Self::Internal(error.into())
52 }
53}
54
55impl From<serde_json::Error> for Error {
56 fn from(error: serde_json::Error) -> Self {
57 Self::Internal(error.into())
58 }
59}
60
61impl IntoResponse for Error {
62 fn into_response(self) -> axum::response::Response {
63 match self {
64 Error::Http(code, message) => {
65 log::error!("HTTP error {}: {}", code, &message);
66 (code, message).into_response()
67 }
68 Error::Database(error) => {
69 log::error!(
70 "HTTP error {}: {:?}",
71 StatusCode::INTERNAL_SERVER_ERROR,
72 &error
73 );
74 (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
75 }
76 Error::Internal(error) => {
77 log::error!(
78 "HTTP error {}: {:?}",
79 StatusCode::INTERNAL_SERVER_ERROR,
80 &error
81 );
82 (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
83 }
84 }
85 }
86}
87
88impl std::fmt::Debug for Error {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 Error::Http(code, message) => (code, message).fmt(f),
92 Error::Database(error) => error.fmt(f),
93 Error::Internal(error) => error.fmt(f),
94 }
95 }
96}
97
98impl std::fmt::Display for Error {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 Error::Http(code, message) => write!(f, "{code}: {message}"),
102 Error::Database(error) => error.fmt(f),
103 Error::Internal(error) => error.fmt(f),
104 }
105 }
106}
107
108impl std::error::Error for Error {}
109
110#[derive(Deserialize)]
111pub struct Config {
112 pub http_port: u16,
113 pub database_url: String,
114 pub database_max_connections: u32,
115 pub api_token: String,
116 pub clickhouse_url: Option<String>,
117 pub clickhouse_user: Option<String>,
118 pub clickhouse_password: Option<String>,
119 pub clickhouse_database: Option<String>,
120 pub invite_link_prefix: String,
121 pub live_kit_server: Option<String>,
122 pub live_kit_key: Option<String>,
123 pub live_kit_secret: Option<String>,
124 pub rust_log: Option<String>,
125 pub log_json: Option<bool>,
126 pub blob_store_url: Option<String>,
127 pub blob_store_region: Option<String>,
128 pub blob_store_access_key: Option<String>,
129 pub blob_store_secret_key: Option<String>,
130 pub blob_store_bucket: Option<String>,
131 pub zed_environment: Arc<str>,
132 pub openai_api_key: Option<Arc<str>>,
133 pub google_ai_api_key: Option<Arc<str>>,
134 pub zed_client_checksum_seed: Option<String>,
135 pub slack_panics_webhook: Option<String>,
136 pub auto_join_channel_id: Option<ChannelId>,
137}
138
139impl Config {
140 pub fn is_development(&self) -> bool {
141 self.zed_environment == "development".into()
142 }
143}
144
145#[derive(Default, Deserialize)]
146pub struct MigrateConfig {
147 pub database_url: String,
148 pub migrations_path: Option<PathBuf>,
149}
150
151pub struct AppState {
152 pub db: Arc<Database>,
153 pub live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
154 pub blob_store_client: Option<aws_sdk_s3::Client>,
155 pub rate_limiter: Arc<RateLimiter>,
156 pub executor: Executor,
157 pub clickhouse_client: Option<clickhouse::Client>,
158 pub config: Config,
159}
160
161impl AppState {
162 pub async fn new(config: Config, executor: Executor) -> Result<Arc<Self>> {
163 let mut db_options = db::ConnectOptions::new(config.database_url.clone());
164 db_options.max_connections(config.database_max_connections);
165 let mut db = Database::new(db_options, Executor::Production).await?;
166 db.initialize_notification_kinds().await?;
167
168 let live_kit_client = if let Some(((server, key), secret)) = config
169 .live_kit_server
170 .as_ref()
171 .zip(config.live_kit_key.as_ref())
172 .zip(config.live_kit_secret.as_ref())
173 {
174 Some(Arc::new(live_kit_server::api::LiveKitClient::new(
175 server.clone(),
176 key.clone(),
177 secret.clone(),
178 )) as Arc<dyn live_kit_server::api::Client>)
179 } else {
180 None
181 };
182
183 let db = Arc::new(db);
184 let this = Self {
185 db: db.clone(),
186 live_kit_client,
187 blob_store_client: build_blob_store_client(&config).await.log_err(),
188 rate_limiter: Arc::new(RateLimiter::new(db)),
189 executor,
190 clickhouse_client: config
191 .clickhouse_url
192 .as_ref()
193 .and_then(|_| build_clickhouse_client(&config).log_err()),
194 config,
195 };
196 Ok(Arc::new(this))
197 }
198}
199
200async fn build_blob_store_client(config: &Config) -> anyhow::Result<aws_sdk_s3::Client> {
201 let keys = aws_sdk_s3::config::Credentials::new(
202 config
203 .blob_store_access_key
204 .clone()
205 .ok_or_else(|| anyhow!("missing blob_store_access_key"))?,
206 config
207 .blob_store_secret_key
208 .clone()
209 .ok_or_else(|| anyhow!("missing blob_store_secret_key"))?,
210 None,
211 None,
212 "env",
213 );
214
215 let s3_config = aws_config::defaults(BehaviorVersion::latest())
216 .endpoint_url(
217 config
218 .blob_store_url
219 .as_ref()
220 .ok_or_else(|| anyhow!("missing blob_store_url"))?,
221 )
222 .region(Region::new(
223 config
224 .blob_store_region
225 .clone()
226 .ok_or_else(|| anyhow!("missing blob_store_region"))?,
227 ))
228 .credentials_provider(keys)
229 .load()
230 .await;
231
232 Ok(aws_sdk_s3::Client::new(&s3_config))
233}
234
235fn build_clickhouse_client(config: &Config) -> anyhow::Result<clickhouse::Client> {
236 Ok(clickhouse::Client::default()
237 .with_url(
238 config
239 .clickhouse_url
240 .as_ref()
241 .ok_or_else(|| anyhow!("missing clickhouse_url"))?,
242 )
243 .with_user(
244 config
245 .clickhouse_user
246 .as_ref()
247 .ok_or_else(|| anyhow!("missing clickhouse_user"))?,
248 )
249 .with_password(
250 config
251 .clickhouse_password
252 .as_ref()
253 .ok_or_else(|| anyhow!("missing clickhouse_password"))?,
254 )
255 .with_database(
256 config
257 .clickhouse_database
258 .as_ref()
259 .ok_or_else(|| anyhow!("missing clickhouse_database"))?,
260 ))
261}