1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//
// Copyright (C) 2018 Kubos Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use file_protocol::{FileProtocol, FileProtocolConfig, ProtocolError, State};
use kubos_system::Config as ServiceConfig;
use log::warn;
use std::collections::HashMap;
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

// We need this in this lib.rs file so we can build integration tests
pub fn recv_loop(config: &ServiceConfig) -> Result<(), failure::Error> {
    // Get and bind our UDP listening socket
    let host = config.hosturl();

    // Extract our local IP address so we can spawn child sockets later
    let mut host_parts = host.split(':').map(|val| val.to_owned());
    let host_ip = host_parts.next().unwrap();

    // Get the storage directory prefix that we'll be using for our
    // temporary/intermediate storage location
    let prefix = match config.get("storage_dir") {
        Some(val) => val.as_str().and_then(|str| Some(str.to_owned())),
        None => None,
    };

    // Get the chunk size to be used for transfers
    let chunk_size = match config.get("chunk_size") {
        Some(val) => val.as_integer().unwrap_or(4096),
        None => 4096,
    } as usize;

    let hold_count = match config.get("hold_count") {
        Some(val) => val.as_integer().unwrap_or(5),
        None => 5,
    } as u16;

    let f_config = FileProtocolConfig::new(prefix, chunk_size, hold_count);

    let c_protocol = cbor_protocol::Protocol::new(&host.clone(), chunk_size);

    let timeout = config
        .get("timeout")
        .and_then(|val| {
            val.as_integer()
                .and_then(|num| Some(Duration::from_secs(num as u64)))
        })
        .unwrap_or(Duration::from_secs(2));

    // Setup map of channel IDs to thread channels
    let raw_threads: HashMap<u32, Sender<serde_cbor::Value>> = HashMap::new();
    // Create thread sharable wrapper
    let threads = Arc::new(Mutex::new(raw_threads));

    loop {
        // Listen on UDP port
        let (source, first_message) = match c_protocol.recv_message_peer() {
            Ok((source, first_message)) => (source, first_message),
            Err(e) => {
                warn!("Error receiving message: {:?}", e);
                continue;
            }
        };

        let config_ref = f_config.clone();
        let host_ref = host_ip.clone();
        let timeout_ref = timeout;

        let channel_id = match file_protocol::parse_channel_id(&first_message) {
            Ok(channel_id) => channel_id,
            Err(e) => {
                warn!("Error parsing channel ID: {:?}", e);
                continue;
            }
        };

        if !threads.lock().unwrap().contains_key(&channel_id) {
            let (sender, receiver): (Sender<serde_cbor::Value>, Receiver<serde_cbor::Value>) =
                mpsc::channel();
            threads.lock().unwrap().insert(channel_id, sender.clone());
            // Break the processing work off into its own thread so we can
            // listen for requests from other clients
            let shared_threads = threads.clone();
            thread::spawn(move || {
                let state = State::Holding {
                    count: 0,
                    prev_state: Box::new(State::Done),
                };

                // Set up the file system processor with the reply socket information
                let f_protocol = FileProtocol::new(&host_ref, &format!("{}", source), config_ref);

                // Listen, process, and react to the remaining messages in the
                // requested operation
                if let Err(e) = f_protocol.message_engine(
                    |d| match receiver.recv_timeout(d) {
                        Ok(v) => Ok(v),
                        Err(RecvTimeoutError::Timeout) => Err(ProtocolError::ReceiveTimeout),
                        Err(e) => Err(ProtocolError::ReceiveError {
                            err: format!("Error {:?}", e),
                        }),
                    },
                    timeout_ref,
                    &state,
                ) {
                    warn!("Encountered errors while processing transaction: {}", e);
                }

                // Remove ourselves from threads list if we are finished
                shared_threads.lock().unwrap().remove(&channel_id);
            });
        }

        if let Some(sender) = threads.lock().unwrap().get(&channel_id) {
            if let Err(e) = sender.send(first_message) {
                warn!("Error when sending to channel {}: {:?}", channel_id, e);
            }
        }

        if !threads.lock().unwrap().contains_key(&channel_id) {
            warn!("No sender found for {}", channel_id);
            threads.lock().unwrap().remove(&channel_id);
        }
    }
}