Browse Source

feature: Add real sqlite backend to persist the pops.

Alexandre Leblanc 5 years atrás
parent
commit
b30ef2f137
5 changed files with 384 additions and 14 deletions
  1. 2 0
      .gitignore
  2. 84 0
      Cargo.lock
  3. 5 2
      Cargo.toml
  4. 247 0
      src/app_context.rs
  5. 46 12
      src/main.rs

+ 2 - 0
.gitignore

@@ -1 +1,3 @@
 /target
+pops.db
+test.db

+ 84 - 0
Cargo.lock

@@ -375,6 +375,9 @@ dependencies = [
  "chrono",
  "env_logger",
  "log",
+ "r2d2",
+ "r2d2_sqlite",
+ "rusqlite",
  "serde",
  "serde_derive",
  "serde_json",
@@ -549,6 +552,18 @@ dependencies = [
  "synstructure",
 ]
 
+[[package]]
+name = "fallible-iterator"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
+
+[[package]]
+name = "fallible-streaming-iterator"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
+
 [[package]]
 name = "flate2"
 version = "1.0.16"
@@ -869,6 +884,17 @@ version = "0.2.74"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10"
 
+[[package]]
+name = "libsqlite3-sys"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e704a02bcaecd4a08b93a23f6be59d0bd79cd161e0963e9499165a0a35df7bd"
+dependencies = [
+ "cc",
+ "pkg-config",
+ "vcpkg",
+]
+
 [[package]]
 name = "linked-hash-map"
 version = "0.5.3"
@@ -1093,6 +1119,12 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
 
+[[package]]
+name = "pkg-config"
+version = "0.3.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d36492546b6af1463394d46f0c834346f31548646f6ba10849802c9c9a27ac33"
+
 [[package]]
 name = "ppv-lite86"
 version = "0.2.8"
@@ -1135,6 +1167,27 @@ dependencies = [
  "proc-macro2",
 ]
 
+[[package]]
+name = "r2d2"
+version = "0.8.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f"
+dependencies = [
+ "log",
+ "parking_lot",
+ "scheduled-thread-pool",
+]
+
+[[package]]
+name = "r2d2_sqlite"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed60ebe88b27ac28c0563bc0fbeaecd302ff53e3a01e5ddc2ec9f4e6c707d929"
+dependencies = [
+ "r2d2",
+ "rusqlite",
+]
+
 [[package]]
 name = "rand"
 version = "0.7.3"
@@ -1210,6 +1263,22 @@ dependencies = [
  "quick-error",
 ]
 
+[[package]]
+name = "rusqlite"
+version = "0.23.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "45d0fd62e1df63d254714e6cb40d0a0e82e7a1623e7a27f679d851af092ae58b"
+dependencies = [
+ "bitflags",
+ "fallible-iterator",
+ "fallible-streaming-iterator",
+ "libsqlite3-sys",
+ "lru-cache",
+ "memchr",
+ "smallvec",
+ "time",
+]
+
 [[package]]
 name = "rustc-demangle"
 version = "0.1.16"
@@ -1222,6 +1291,15 @@ version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
 
+[[package]]
+name = "scheduled-thread-pool"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7"
+dependencies = [
+ "parking_lot",
+]
+
 [[package]]
 name = "scopeguard"
 version = "1.1.0"
@@ -1536,6 +1614,12 @@ dependencies = [
  "percent-encoding",
 ]
 
+[[package]]
+name = "vcpkg"
+version = "0.2.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c"
+
 [[package]]
 name = "wasi"
 version = "0.9.0+wasi-snapshot-preview1"

+ 5 - 2
Cargo.toml

@@ -9,10 +9,13 @@ edition = "2018"
 [dependencies]
 actix-web = "2"
 actix-rt = "1"
-chrono = { version = "0.4.13",  features= ["serde"] }
+chrono = { version = "0.4.13",  features = ["serde"] }
 env_logger = "0.7"
 log = "0.4"
+rusqlite = { version = "0.23", features = ["bundled"] }
 serde = "1.0"
 serde_derive = "1.0"
 serde_json = "1.0"
-serde_repr = "0.1"
+serde_repr = "0.1"
+r2d2 = "0.8"
+r2d2_sqlite = "0.16"

+ 247 - 0
src/app_context.rs

@@ -0,0 +1,247 @@
+extern crate log;
+extern crate r2d2;
+extern crate r2d2_sqlite;
+
+use crate::broadsign::real_time_pop_request::RealTimePopRequest;
+use r2d2_sqlite::SqliteConnectionManager;
+use rusqlite::{params, Result};
+use std::sync::Arc;
+
+pub struct Database {
+    pool: Arc<r2d2::Pool<SqliteConnectionManager>>,
+}
+
+// Quick and simple AppContext that will contain the database context
+// so we can store the pops.
+pub struct AppContext {
+    pub database: Database,
+}
+
+impl Database {
+    pub fn from_sqlite(file_name: &'static str) -> Self {
+        let manager = SqliteConnectionManager::file(file_name);
+        let pool = Arc::new(r2d2::Pool::builder().build(manager).unwrap());
+
+        info!("Initializing pops database 'pops.db'.");
+
+        let conn = pool.get().unwrap();
+        let result = conn.execute_batch(
+            r#"
+        BEGIN;
+        create table if not exists api_users (
+            user_id integer primary key,
+            api_key text not null unique
+        );
+        create table if not exists pops (
+            pop_id integer primary key,
+            user_id integer not null,
+            player_id integer not null,
+            display_unit_id integer not null,
+            frame_id integer not null,
+            active_screens_count integer not null,
+            ad_copy_id integer not null,
+            schedule_id integer not null,
+            impressions integer not null,
+            interactions integer not null,
+            end_time integer not null,
+            duration_ms integer not null,
+            service_name text not null,
+            service_value text not null,
+            extra_data text not null,
+            FOREIGN KEY(user_id) REFERENCES api_users(user_id)
+        );
+        COMMIT;"#,
+        );
+
+        if let Err(e) = result {
+            error!("Could not create tables: {}", e);
+            panic!("Could not create tables.");
+        }
+
+        Database {
+            pool: Arc::clone(&pool),
+        }
+    }
+
+    pub fn user_exists(&self, api_key: &String) -> bool {
+        let pool_result = self.pool.get();
+
+        match pool_result {
+            Err(e) => {
+                error!("Could not get database connection: {}", e);
+                false
+            }
+            Ok(conn) => {
+                let result: Result<bool, _> = conn.query_row(
+                    "select 1 from api_users where api_key = ?1;",
+                    params![api_key],
+                    |row| row.get(0),
+                );
+
+                match result {
+                    Ok(exists) => exists,
+                    Err(e) => {
+                        error!("{}", e);
+                        false
+                    }
+                }
+            }
+        }
+    }
+
+    // Only present for the unit tests.
+    pub fn create_user(&self, api_key: &'static str) {
+        let pool_result = self.pool.get();
+
+        match pool_result {
+            Err(e) => {
+                error!("Could not get database connection: {}", e);
+            }
+            Ok(conn) => {
+                let result = conn.execute(
+                    "insert or ignore into api_users (api_key) values (?1);",
+                    params![api_key],
+                );
+
+                if let Err(e) = result {
+                    error!("{}", e);
+                }
+            }
+        }
+    }
+
+    pub fn store_pop(&self, pops: &RealTimePopRequest) -> bool {
+        let pool_result = self.pool.get();
+
+        match pool_result {
+            Err(e) => {
+                error!("Could not get database connection: {}", e);
+                return false;
+            }
+            Ok(mut conn) => {
+                if let Ok(tx) = conn.transaction() {
+                    for pop in &pops.pops {
+                        let result = tx.execute(
+                            r#"insert into pops (
+                        user_id,
+                        player_id,
+                        display_unit_id,
+                        frame_id,
+                        active_screens_count,
+                        ad_copy_id,
+                        schedule_id,
+                        impressions,
+                        interactions,
+                        end_time,
+                        duration_ms,
+                        service_name,
+                        service_value,
+                        extra_data)
+                        select
+                            user_id,
+                            ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13
+                        from
+                            api_users
+                        where
+                            api_key = ?14;"#,
+                            params![
+                                pops.player_id as i64,
+                                pop.display_unit_id as i64,
+                                pop.frame_id as i64,
+                                pop.active_screens_count as i32,
+                                pop.ad_copy_id as i64,
+                                pop.schedule_id as i64,
+                                pop.impressions as i32,
+                                pop.interactions as i32,
+                                pop.end_time.timestamp_millis(),
+                                pop.duration_ms as i32,
+                                pop.service_name,
+                                pop.service_value,
+                                pop.extra_data.as_str().unwrap(),
+                                pops.api_key
+                            ],
+                        );
+
+                        if let Err(err) = result {
+                            error!("{}", err);
+                            let _ = tx.rollback();
+                            return false;
+                        }
+                    }
+
+                    match tx.commit() {
+                        Ok(_) => return true,
+                        Err(e) => {
+                            error!("{}", e);
+                            return false;
+                        }
+                    }
+                }
+
+                return false;
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests_database {
+    use super::*;
+    use crate::broadsign::real_time_pop_request::{RealTimePopEntry, RealTimePopRequest};
+    use rusqlite::NO_PARAMS;
+    use serde_json::json;
+
+    fn ensure_user(db: &Database) {
+        let conn = db.pool.get().unwrap();
+        let r = conn.execute(
+            "insert or ignore into api_users (api_key) values ('some_secure_api_key')",
+            NO_PARAMS,
+        );
+
+        if let Err(e) = r {
+            panic!("{}", e);
+        }
+    }
+
+    #[actix_rt::test]
+    async fn given_initialization_step_do_not_fail() {
+        let _ = Database::from_sqlite("test.db");
+        assert!(true);
+    }
+
+    #[actix_rt::test]
+    async fn given_an_existing_api_key_user_exists_should_return_true() {
+        let db = Database::from_sqlite("test.db");
+        ensure_user(&db);
+        let exists = db.user_exists(&"some_secure_api_key".to_owned());
+
+        assert_eq!(exists, true);
+    }
+
+    #[actix_rt::test]
+    async fn given_a_valid_pop_request_should_succeed_to_insert() {
+        let db = Database::from_sqlite("test.db");
+        ensure_user(&db);
+        let result = db.store_pop(&RealTimePopRequest {
+            api_key: "some_secure_api_key".to_owned(),
+            player_id: 123456,
+            pops: vec![RealTimePopEntry {
+                display_unit_id: 123,
+                frame_id: 124,
+                active_screens_count: 2,
+                ad_copy_id: 56467,
+                campaign_id: 61000,
+                schedule_id: 61001,
+                impressions: 675,
+                interactions: 0,
+                end_time: chrono::NaiveDate::from_ymd(2017, 11, 23).and_hms_milli(13, 27, 12, 500),
+                duration_ms: 12996,
+                service_name: "bmb".to_owned(),
+                service_value: "701".to_owned(),
+                extra_data: json!(""),
+            }],
+        });
+
+        assert_eq!(result, true);
+    }
+}

+ 46 - 12
src/main.rs

@@ -1,28 +1,45 @@
 #[macro_use]
 extern crate log;
+mod app_context;
 mod broadsign;
 
 use actix_web::{middleware, web, App, HttpResponse, HttpServer};
+use app_context::{AppContext, Database};
 use broadsign::real_time_pop_request::RealTimePopRequest;
 
 // We keep authentication at its simplest form, but you could
 // return the api user informations through a Result<UserIdentity> mechanism.
-pub fn authenticate(api_key: &String) -> bool {
-    api_key == "some_secure_api_key"
+pub fn authenticate(app_context: &web::Data<AppContext>, api_key: &String) -> bool {
+    app_context.database.user_exists(api_key)
 }
 
 pub async fn status_get() -> HttpResponse {
     HttpResponse::Ok().finish()
 }
 
-pub async fn pop_post(pop_data: web::Json<RealTimePopRequest>) -> HttpResponse {
+pub async fn pop_post(
+    app_context: web::Data<AppContext>,
+    pop_data: web::Json<RealTimePopRequest>,
+) -> HttpResponse {
     let pop_data: RealTimePopRequest = pop_data.into_inner();
 
     debug!("Received pop submission:\n{:?}", pop_data);
 
-    if !authenticate(&pop_data.api_key) {
+    if !authenticate(&app_context, &pop_data.api_key) {
         error!("Pop submission refused for api key '{}'", &pop_data.api_key);
-        return HttpResponse::Unauthorized().finish();
+        return HttpResponse::Unauthorized().body(format!(
+            "Unauthorized access for api key '{}'",
+            &pop_data.api_key
+        ));
+    }
+
+    if !app_context.database.store_pop(&pop_data) {
+        error!(
+            "Failed to store {} pops for api key {}.",
+            pop_data.pops.len(),
+            pop_data.api_key
+        );
+        HttpResponse::InternalServerError().finish();
     }
 
     HttpResponse::Ok().finish()
@@ -38,11 +55,16 @@ async fn main() -> std::io::Result<()> {
     info!("=== Starting Real-Time Pop Service ===");
 
     HttpServer::new(move || {
-        App::new().wrap(middleware::Logger::default()).service(
-            web::scope("")
-                .route("/status", web::get().to(status_get))
-                .route("/pop", web::post().to(pop_post)),
-        )
+        App::new()
+            .data(AppContext {
+                database: Database::from_sqlite("pops.db"),
+            })
+            .wrap(middleware::Logger::default())
+            .service(
+                web::scope("")
+                    .route("/status", web::get().to(status_get))
+                    .route("/pop", web::post().to(pop_post)),
+            )
     })
     .bind("0.0.0.0:8080")?
     .run()
@@ -91,19 +113,31 @@ mod tests_endpoint_pop {
         }
     }
 
+    fn make_app_context() -> web::Data<AppContext> {
+        web::Data::<AppContext>::new(AppContext {
+            database: Database::from_sqlite("test.db"),
+        })
+    }
+
     #[actix_rt::test]
     async fn given_a_valid_pop_and_healthy_server_respond_ok() {
-        let resp = pop_post(web::Json(make_valid_pop_request())).await;
+        let app_context = make_app_context();
+        app_context.database.create_user("some_secure_api_key");
+
+        let resp = pop_post(app_context, web::Json(make_valid_pop_request())).await;
 
         assert_eq!(resp.status(), http::StatusCode::OK);
     }
 
     #[actix_rt::test]
     async fn given_an_invalid_api_key_server_responds_401_unauthorized() {
+        let app_context = make_app_context();
+        app_context.database.create_user("some_secure_api_key");
+
         let mut request = make_valid_pop_request();
         request.api_key = "some_invalid_api_key".to_owned();
 
-        let resp = pop_post(web::Json(request)).await;
+        let resp = pop_post(app_context, web::Json(request)).await;
 
         assert_eq!(resp.status(), http::StatusCode::UNAUTHORIZED);
     }