use failure::format_err;
use futures::prelude::*;
use futures::sync::{mpsc, oneshot};
use jsonrpc_core::Id;
use jsonrpc_pubsub::SubscriptionId;
use log::debug;
use serde_json::Value;
use std::collections::HashMap;
use std::collections::VecDeque;
use super::RequestBuilder;
use crate::{RpcChannel, RpcError, RpcMessage};
struct Subscription {
	
	id: Option<SubscriptionId>,
	
	notification: String,
	
	unsubscribe: String,
	
	channel: mpsc::Sender<Result<Value, RpcError>>,
}
impl Subscription {
	fn new(channel: mpsc::Sender<Result<Value, RpcError>>, notification: String, unsubscribe: String) -> Self {
		Subscription {
			id: None,
			notification,
			unsubscribe,
			channel,
		}
	}
}
enum PendingRequest {
	Call(oneshot::Sender<Result<Value, RpcError>>),
	Subscription(Subscription),
}
pub struct Duplex<TSink, TStream> {
	request_builder: RequestBuilder,
	
	channel: Option<mpsc::Receiver<RpcMessage>>,
	
	pending_requests: HashMap<Id, PendingRequest>,
	
	subscriptions: HashMap<(SubscriptionId, String), Subscription>,
	
	stream: TStream,
	
	incoming: VecDeque<(Id, Result<Value, RpcError>, Option<String>, Option<SubscriptionId>)>,
	
	outgoing: VecDeque<String>,
	
	sink: TSink,
}
impl<TSink, TStream> Duplex<TSink, TStream> {
	
	fn new(sink: TSink, stream: TStream, channel: mpsc::Receiver<RpcMessage>) -> Self {
		log::debug!("open");
		Duplex {
			request_builder: RequestBuilder::new(),
			channel: Some(channel),
			pending_requests: Default::default(),
			subscriptions: Default::default(),
			stream,
			incoming: Default::default(),
			outgoing: Default::default(),
			sink,
		}
	}
}
pub fn duplex<TSink, TStream>(sink: TSink, stream: TStream) -> (Duplex<TSink, TStream>, RpcChannel) {
	let (sender, receiver) = mpsc::channel(0);
	let client = Duplex::new(sink, stream, receiver);
	(client, sender.into())
}
impl<TSink, TStream> Future for Duplex<TSink, TStream>
where
	TSink: Sink<SinkItem = String, SinkError = RpcError>,
	TStream: Stream<Item = String, Error = RpcError>,
{
	type Item = ();
	type Error = RpcError;
	fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
		
		log::debug!("handle requests from client");
		loop {
			
			let channel = match self.channel.as_mut() {
				Some(channel) => channel,
				None => break,
			};
			let msg = match channel.poll() {
				Ok(Async::Ready(Some(msg))) => msg,
				Ok(Async::Ready(None)) => {
					
					
					self.channel.take();
					break;
				}
				Ok(Async::NotReady) => break,
				Err(()) => continue,
			};
			let request_str = match msg {
				RpcMessage::Call(msg) => {
					let (id, request_str) = self.request_builder.call_request(&msg);
					if self
						.pending_requests
						.insert(id.clone(), PendingRequest::Call(msg.sender))
						.is_some()
					{
						log::error!("reuse of request id {:?}", id);
					}
					request_str
				}
				RpcMessage::Subscribe(msg) => {
					let crate::Subscription {
						subscribe,
						subscribe_params,
						notification,
						unsubscribe,
					} = msg.subscription;
					let (id, request_str) = self.request_builder.subscribe_request(subscribe, subscribe_params);
					log::debug!("subscribing to {}", notification);
					let subscription = Subscription::new(msg.sender, notification, unsubscribe);
					if self
						.pending_requests
						.insert(id.clone(), PendingRequest::Subscription(subscription))
						.is_some()
					{
						log::error!("reuse of request id {:?}", id);
					}
					request_str
				}
				RpcMessage::Notify(msg) => self.request_builder.notification(&msg),
			};
			log::debug!("outgoing: {}", request_str);
			self.outgoing.push_back(request_str);
		}
		
		
		log::debug!("handle stream");
		loop {
			let response_str = match self.stream.poll() {
				Ok(Async::Ready(Some(response_str))) => response_str,
				Ok(Async::Ready(None)) => {
					
					
					
					debug!("connection closed");
					return Ok(Async::Ready(()));
				}
				Ok(Async::NotReady) => break,
				Err(err) => Err(err)?,
			};
			log::debug!("incoming: {}", response_str);
			
			let (id, result, method, sid) = super::parse_response(&response_str)?;
			log::debug!(
				"id: {:?} (sid: {:?}) result: {:?} method: {:?}",
				id,
				sid,
				result,
				method
			);
			self.incoming.push_back((id, result, method, sid));
		}
		
		log::debug!("handle incoming");
		loop {
			match self.incoming.pop_front() {
				Some((id, result, method, sid)) => {
					let sid_and_method = sid.and_then(|sid| method.map(|method| (sid, method)));
					
					match self.pending_requests.remove(&id) {
						
						Some(PendingRequest::Call(tx)) => {
							tx.send(result)
								.map_err(|_| RpcError::Other(format_err!("oneshot channel closed")))?;
							continue;
						}
						
						
						Some(PendingRequest::Subscription(mut subscription)) => {
							let sid = result.as_ref().ok().and_then(|res| SubscriptionId::parse_value(res));
							let method = subscription.notification.clone();
							if let Some(sid) = sid {
								subscription.id = Some(sid.clone());
								if self
									.subscriptions
									.insert((sid.clone(), method.clone()), subscription)
									.is_some()
								{
									log::warn!(
										"Overwriting existing subscription under {:?} ({:?}). \
										 Seems that server returned the same subscription id.",
										sid,
										method,
									);
								}
							} else {
								let err = RpcError::Other(format_err!(
									"Subscription {:?} ({:?}) rejected: {:?}",
									id,
									method,
									result,
								));
								match subscription.channel.poll_ready() {
									Ok(Async::Ready(())) => {
										subscription
											.channel
											.try_send(result)
											.expect("The channel is ready; qed");
									}
									Ok(Async::NotReady) => {
										self.incoming.push_back((id, result, Some(method), sid));
										break;
									}
									Err(_) => {
										log::warn!("{}, but the reply channel has closed.", err);
									}
								};
							}
							continue;
						}
						
						None if sid_and_method.is_none() => {
							log::warn!("Got unexpected response with id {:?} ({:?})", id, sid_and_method);
							continue;
						}
						
						None => {}
					};
					let sid_and_method = if let Some(x) = sid_and_method {
						x
					} else {
						continue;
					};
					if let Some(subscription) = self.subscriptions.get_mut(&sid_and_method) {
						match subscription.channel.poll_ready() {
							Ok(Async::Ready(())) => {
								subscription
									.channel
									.try_send(result)
									.expect("The channel is ready; qed");
							}
							Ok(Async::NotReady) => {
								let (sid, method) = sid_and_method;
								self.incoming.push_back((id, result, Some(method), Some(sid)));
								break;
							}
							Err(_) => {
								let subscription = self
									.subscriptions
									.remove(&sid_and_method)
									.expect("Subscription was just polled; qed");
								let sid = subscription.id.expect(
									"Every subscription that ends up in `self.subscriptions` has id already \
									 assigned; assignment happens during response to subscribe request.",
								);
								let (_id, request_str) =
									self.request_builder.unsubscribe_request(subscription.unsubscribe, sid);
								log::debug!("outgoing: {}", request_str);
								self.outgoing.push_back(request_str);
								log::debug!("unsubscribed from {:?}", sid_and_method);
							}
						}
					} else {
						log::warn!("Received unexpected subscription notification: {:?}", sid_and_method);
					}
				}
				None => break,
			}
		}
		
		
		log::debug!("handle outgoing");
		loop {
			match self.outgoing.pop_front() {
				Some(request) => match self.sink.start_send(request)? {
					AsyncSink::Ready => {}
					AsyncSink::NotReady(request) => {
						self.outgoing.push_front(request);
						break;
					}
				},
				None => break,
			}
		}
		log::debug!("handle sink");
		let sink_empty = match self.sink.poll_complete()? {
			Async::Ready(()) => true,
			Async::NotReady => false,
		};
		log::debug!("{:?}", self);
		
		if self.channel.is_none()
			&& self.outgoing.is_empty()
			&& self.incoming.is_empty()
			&& self.pending_requests.is_empty()
			&& self.subscriptions.is_empty()
			&& sink_empty
		{
			log::debug!("close");
			Ok(Async::Ready(()))
		} else {
			Ok(Async::NotReady)
		}
	}
}
impl<TSink, TStream> std::fmt::Debug for Duplex<TSink, TStream> {
	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
		writeln!(f, "channel is none: {}", self.channel.is_none())?;
		writeln!(f, "outgoing: {}", self.outgoing.len())?;
		writeln!(f, "incoming: {}", self.incoming.len())?;
		writeln!(f, "pending_requests: {}", self.pending_requests.len())?;
		writeln!(f, "subscriptions: {}", self.subscriptions.len())?;
		Ok(())
	}
}