Initial commit

master
Mageas 2 years ago
commit d8939df172
Signed by: Mageas
GPG Key ID: B45836562531E7AD

4
.gitignore vendored

@ -0,0 +1,4 @@
.idea
/target
config.json
database.db3

2261
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -0,0 +1,19 @@
[package]
name = "matrix_piped_feed"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version="1.0.130", features=["derive"] }
serde_json = "1.0.72"
tokio = { version="1.14.0", features=["full"] }
matrix-sdk = { version = "0.4.1", default-features = false, features=["native-tls"] }
rusqlite = "0.26.1"
isahc = "1.6.0"
feed-rs = "1.0.0"
chrono = "0.4.19"
regex = "1.5.4"
log = { version= "0.4.14", features=["release_max_level_warn"] }
simplelog = "0.11.0"

@ -0,0 +1,18 @@
{
"credentials": {
"user": "my_user",
"password": "my_password",
"homeserver": "https://matrix-client.matrix.org"
},
"fetch_interval": 300,
"fetch_between": {
"start": 8,
"end": 23
},
"subscriptions": [
{
"room": "matrix_room_id",
"feed": "piped_feed_uri"
}
]
}

@ -0,0 +1,101 @@
use serde::Deserialize;
use std::{collections::HashSet, hash::Hash};
#[derive(Deserialize, Debug)]
pub struct FetchBetween {
pub start: u8,
pub end: u8,
}
#[derive(Deserialize, Debug)]
pub struct Subscription {
pub room: String,
pub feed: String,
#[serde(skip)]
pub last_feed: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct Credentials {
pub user: String,
pub password: String,
pub homeserver: String,
}
#[derive(Deserialize, Debug)]
pub struct Config {
pub credentials: Credentials,
pub fetch_interval: i64,
pub fetch_between: Option<FetchBetween>,
pub subscriptions: Vec<Subscription>,
}
impl PartialEq for Subscription {
fn eq(&self, other: &Self) -> bool {
self.room == other.room && self.feed == other.feed
}
}
impl Config {
pub(crate) fn new(path: &str) -> Result<Config, String> {
let file = match std::fs::read_to_string(&path) {
Ok(f) => f,
Err(e) => return Err(e.to_string()),
};
let config = match serde_json::from_str::<Config>(&file) {
Ok(c) => c,
Err(e) => return Err(e.to_string()),
};
let keys = &config
.subscriptions
.iter()
.map(|s| s.room.as_ref())
.collect::<Vec<&str>>();
if !Config::has_unique_entries(&mut keys.into_iter()) {
return Err("duplicated room_id".to_string());
}
if !Config::has_valid_rooms(&config.subscriptions) {
return Err("invalid room".to_string());
}
if !Config::has_valid_feeds(&config.subscriptions) {
return Err("invalid feed".to_string());
}
Ok(config)
}
/// Check if the config has unique entries.
///
/// If the config has unique `room_id` keys,
/// the config is valid.
fn has_unique_entries<T>(iter: T) -> bool
where
T: IntoIterator,
T::Item: Eq + Hash,
{
let mut uniq = HashSet::new();
iter.into_iter().all(move |x| uniq.insert(x))
}
/// Check if the config has valid rooms.
fn has_valid_rooms(iter: &Vec<Subscription>) -> bool {
use matrix_sdk::ruma::RoomId;
iter.into_iter().all(|x| x.room.parse::<RoomId>().is_ok())
}
/// Check if the config has valid feeds.
fn has_valid_feeds(iter: &Vec<Subscription>) -> bool {
use regex::Regex;
iter.into_iter().all(|x| {
Regex::new(r"^https://pipedapi\.kavin\.rocks/feed/rss\?authToken=([0-9a-zA-Z-]*)$")
.unwrap()
.is_match(&x.feed)
})
}
}

@ -0,0 +1,103 @@
use crate::config::{Config, Subscription};
use rusqlite::{params, Connection, Error, Result};
use log::info;
#[derive(Debug)]
pub struct Database {
conn: Connection,
}
impl Database {
pub fn open(path: &str) -> Result<Database, Error> {
let db = Database {
conn: Connection::open(&path)?,
};
if let Err(e) = db.conn.execute(
"CREATE TABLE IF NOT EXISTS info (
id INTEGER PRIMARY KEY,
room TEXT NOT NULL,
feed TEXT NOT NULL,
last_feed TEXT
)",
[],
) {
return Err(e);
};
Ok(db)
}
/// Insert data to the database.
pub fn insert(&self, room: &str, feed: &str, last_feed: &Option<String>) -> Result<(), Error> {
let mut stmt = self
.conn
.prepare("INSERT INTO info (room, feed, last_feed) VALUES (?1, ?2, ?3)")?;
match stmt.execute(params![room, feed, last_feed]) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
/// Remove data from database.
pub fn remove(&self, room: &str) -> Result<(), Error> {
let mut stmt = self.conn.prepare("DELETE FROM info WHERE room = ?")?;
match stmt.execute(params![room]) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
/// Update data from database.
pub fn update(&self, room: &str, last_feed: &str) -> Result<(), Error> {
let mut stmt = self.conn.prepare(&format!(
"UPDATE info SET last_feed = '{}' WHERE room = ?",
last_feed
))?;
match stmt.execute(params![room]) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
/// Get the data from the database.
pub fn get(&self) -> Result<Vec<Subscription>, Error> {
let mut stmt = self
.conn
.prepare("SELECT room, feed, last_feed from info")?;
let response = stmt.query_map([], |row| {
Ok(Subscription {
room: row.get(0)?,
feed: row.get(1)?,
last_feed: row.get(2)?,
})
})?;
Ok(response
.into_iter()
.filter_map(|f| f.ok())
.collect::<Vec<Subscription>>())
}
/// Sync the database with the config.
pub fn sync_with_config(&self, config: &Config) -> Result<(), Error> {
let feeds = self.get()?;
// Remove old subscriptions from the database
for sub in feeds.iter().filter(|s| !config.subscriptions.contains(&s)) {
info!("sync_with_config; remove {:?} form the database", &sub);
self.remove(&sub.room)?;
}
// Add missing subscriptions from the database
for sub in config.subscriptions.iter().filter(|s| !feeds.contains(&s)) {
info!("sync_with_config; insert {:?} to the database", &sub);
self.insert(&sub.room, &sub.feed, &sub.last_feed)?;
}
Ok(())
}
}

@ -0,0 +1,91 @@
use chrono::{prelude::Utc, Timelike};
use std::time::Duration;
use log::{error, info, LevelFilter};
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use std::process::exit;
use config::Config as AppConfig;
mod config;
use database::Database;
mod database;
mod matrix;
mod rss;
#[tokio::main]
async fn main() {
TermLogger::init(
LevelFilter::Info,
Config::default(),
TerminalMode::Stdout,
ColorChoice::Auto,
)
.unwrap();
let config_path = match std::env::var("MATRIX_PIPED_FEED__CONFIG") {
Ok(cp) => cp,
Err(_) => "config.json".to_string(),
};
let database_path = match std::env::var("MATRIX_PIPED_FEED__DATABASE") {
Ok(cp) => cp,
Err(_) => "database.db3".to_string(),
};
let config = match AppConfig::new(&config_path) {
Ok(c) => c,
Err(e) => {
error!("{}", e);
exit(1);
}
};
let client = match matrix::login_and_sync(
&config.credentials.user,
&config.credentials.password,
&config.credentials.homeserver,
)
.await
{
Ok(c) => c,
Err(e) => {
error!("{}", e);
exit(1);
}
};
info!(
"main; logged in as {:?}",
client.display_name().await.unwrap()
);
let db = Database::open(&database_path).unwrap();
if let Err(e) = db.sync_with_config(&config) {
error!("{}", e);
exit(1);
};
loop {
let now = Utc::now().hour() as u8 + 1_u8;
match &config.fetch_between {
Some(fb) => {
if now >= fb.start && now <= fb.end {
if let Err(e) = rss::update_rss(&db, &client).await {
error!("{}", e);
};
} else {
info!("main; not the time to fetch feeds");
}
}
None => {
if let Err(e) = rss::update_rss(&db, &client).await {
error!("{}", e);
};
()
}
}
std::thread::sleep(Duration::from_secs(config.fetch_interval as u64));
}
}

@ -0,0 +1,49 @@
use matrix_sdk::{
reqwest::Url,
ruma::{events::room::message::MessageEventContent, RoomId},
Client, SyncSettings,
};
use std::time::Duration;
use log::error;
/// Login and sync the session to matrix.
pub async fn login_and_sync(
user: &str,
password: &str,
homeserver: &str,
) -> Result<Client, String> {
let homeserver = match Url::parse(&homeserver) {
Ok(hs) => hs,
Err(e) => return Err(e.to_string()),
};
let client = Client::new(homeserver).unwrap();
if let Err(e) = client.login(user, password, None, None).await {
return Err(format!("{:?}", e));
};
let sync_settings = SyncSettings::new().timeout(Duration::from_secs(30));
if let Err(e) = client.sync_once(sync_settings).await {
return Err(format!("{:?}", e));
};
Ok(client)
}
/// Send a message to the room
pub async fn send_message(client: &Client, room: &str, message: &String, html_message: &String) {
//todo: Move this conversion in the config file
let room_id = room.parse::<RoomId>().unwrap();
if let Err(e) = client
.room_send(
&room_id,
MessageEventContent::text_html(message, html_message),
None,
)
.await
{
error!("send_message; {:?}", e);
};
}

@ -0,0 +1,88 @@
use crate::database::Database;
use crate::matrix;
use log::{info, warn};
use isahc::{self, ReadResponseExt};
use rusqlite::Error;
use feed_rs::{model::Entry, parser};
use matrix_sdk::Client;
pub async fn update_rss(db: &Database, client: &Client) -> Result<(), Error> {
let subscriptions = db.get()?;
for sub in subscriptions {
let mut response = match isahc::get(&sub.feed) {
Ok(res) => res,
Err(_) => {
warn!("update_rss; unable to contact {}", &sub.feed);
continue;
}
};
if response.status() != 200 {
warn!(
"update_rss; response code {} form {}",
response.status(),
&sub.feed
);
continue;
}
let xml = match response.text() {
Ok(xml) => xml,
Err(_) => {
warn!("update_rss; blank web page from {}", &sub.feed);
continue;
}
};
let feed = match parser::parse(xml.as_bytes()) {
Ok(f) => f,
Err(_) => {
warn!("update_rss; invalid rss feed from {}", &sub.feed);
continue;
}
};
let entries_to_update = match sub.last_feed {
Some(lf) => feed
.entries
.iter()
.rev()
.skip_while(|e| e.id != lf)
.skip(1)
.collect::<Vec<&Entry>>(),
None => feed.entries.iter().rev().collect::<Vec<&Entry>>(),
};
if entries_to_update.is_empty() {
info!("update_rss; nothing to update");
continue;
}
if let Err(e) = db.update(&sub.room, &entries_to_update.last().unwrap().id) {
warn!(
"update_rss; unable to update the database for {} ({})",
&sub.feed, e
);
continue;
}
for entry in entries_to_update.into_iter() {
let title = &entry.title.as_ref().unwrap().content;
let author = &entry.authors.first().as_ref().unwrap().name;
let uri = &entry.id;
let message = format!("{}\n{}\n{}", &title, &author, &uri);
let html_message = format!(
"<p></p><p><b>{}</b></br><i>{}</i></br>{}</p>",
&title, &author, &uri
);
info!("update_rss; new entry for ({}) {}", &title, &sub.feed);
matrix::send_message(&client, &sub.room, &message, &html_message).await;
}
}
Ok(())
}
Loading…
Cancel
Save