







#import "MQTTSessionManager.h"
#import "MQTTCoreDataPersistence.h"
#import "MQTTLog.h"
#import "ReconnectTimer.h"
#import "ForegroundReconnection.h"
#import "MQTTSSLSecurityPolicyTransport.h"

@interface MQTTSessionManager()

@property (nonatomic, readwrite) MQTTSessionManagerState state;
@property (nonatomic, readwrite) NSError *lastErrorCode;

@property (strong, nonatomic) ReconnectTimer *reconnectTimer;
@property (nonatomic) BOOL reconnectFlag;

@property (strong, nonatomic) MQTTSession *session;

@property (strong, nonatomic) NSString *host;
@property (nonatomic) UInt32 port;
@property (nonatomic) BOOL tls;
@property (nonatomic) NSInteger keepalive;
@property (nonatomic) BOOL clean;
@property (nonatomic) BOOL auth;
@property (nonatomic) BOOL will;
@property (strong, nonatomic) NSString *user;
@property (strong, nonatomic) NSString *pass;
@property (strong, nonatomic) NSString *willTopic;
@property (strong, nonatomic) NSData *willMsg;
@property (nonatomic) NSInteger willQos;
@property (nonatomic) BOOL willRetainFlag;
@property (strong, nonatomic) NSString *clientId;
@property (strong, nonatomic) dispatch_queue_t queue;
@property (strong, nonatomic) MQTTSSLSecurityPolicy *securityPolicy;
@property (strong, nonatomic) NSArray *certificates;
@property (nonatomic) MQTTProtocolVersion protocolLevel;

#if TARGET_OS_IPHONE == 1
@property (strong, nonatomic) ForegroundReconnection *foregroundReconnection;
#endif

@property (nonatomic) BOOL persistent;
@property (nonatomic) NSUInteger maxWindowSize;
@property (nonatomic) NSUInteger maxSize;
@property (nonatomic) NSUInteger maxMessages;
@property (strong, nonatomic) NSString *streamSSLLevel;

@property (strong, nonatomic) NSDictionary<NSString *, NSNumber *> *internalSubscriptions;
@property (strong, nonatomic) NSDictionary<NSString *, NSNumber *> *effectiveSubscriptions;
@property (strong, nonatomic) NSLock *subscriptionLock;

@end

#define RECONNECT_TIMER 1.0
#define RECONNECT_TIMER_MAX_DEFAULT 64.0

@implementation MQTTSessionManager

- (instancetype)init {
    self = [self initWithPersistence:MQTT_PERSISTENT
                       maxWindowSize:MQTT_MAX_WINDOW_SIZE
                         maxMessages:MQTT_MAX_MESSAGES
                             maxSize:MQTT_MAX_SIZE
          maxConnectionRetryInterval:RECONNECT_TIMER_MAX_DEFAULT
                 connectInForeground:YES
                      streamSSLLevel:(NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL
                               queue:dispatch_get_main_queue()];
    return self;
}

- (MQTTSessionManager *)initWithPersistence:(BOOL)persistent
                              maxWindowSize:(NSUInteger)maxWindowSize
                                maxMessages:(NSUInteger)maxMessages
                                    maxSize:(NSUInteger)maxSize
                 maxConnectionRetryInterval:(NSTimeInterval)maxRetryInterval
                        connectInForeground:(BOOL)connectInForeground
                             streamSSLLevel:(NSString *)streamSSLLevel
                                      queue:(dispatch_queue_t)queue {
    self = [super init];
    self.streamSSLLevel = streamSSLLevel;
    self.queue = queue;
    [self updateState:MQTTSessionManagerStateStarting];
    self.internalSubscriptions = [[NSMutableDictionary alloc] init];
    self.effectiveSubscriptions = [[NSMutableDictionary alloc] init];
    self.persistent = persistent;
    self.maxWindowSize = maxWindowSize;
    self.maxSize = maxSize;
    self.maxMessages = maxMessages;
    
    __weak MQTTSessionManager *weakSelf = self;
    self.reconnectTimer = [[ReconnectTimer alloc] initWithRetryInterval:RECONNECT_TIMER
                                                       maxRetryInterval:maxRetryInterval
                                                                  queue:self.queue
                                                         reconnectBlock:^{
        if ([self.delegate respondsToSelector:@selector(sessionManagerReconnect:)]) {
            [self.delegate sessionManagerReconnect:self];
        }else {
            [weakSelf reconnect:nil];
        }
    }];
#if TARGET_OS_IPHONE == 1
    if (connectInForeground) {
        self.foregroundReconnection = [[ForegroundReconnection alloc] initWithMQTTSessionManager:self];
    }
#endif
    self.subscriptionLock = [[NSLock alloc] init];
    
    return self;
}

- (void)connectTo:(NSString *)host
             port:(NSInteger)port
              tls:(BOOL)tls
        keepalive:(NSInteger)keepalive
            clean:(BOOL)clean
             auth:(BOOL)auth
             user:(NSString *)user
             pass:(NSString *)pass
             will:(BOOL)will
        willTopic:(NSString *)willTopic
          willMsg:(NSData *)willMsg
          willQos:(MQTTQosLevel)willQos
   willRetainFlag:(BOOL)willRetainFlag
     withClientId:(NSString *)clientId
   securityPolicy:(MQTTSSLSecurityPolicy *)securityPolicy
     certificates:(NSArray *)certificates
    protocolLevel:(MQTTProtocolVersion)protocolLevel
   connectHandler:(MQTTConnectHandler)connectHandler {
    DDLogVerbose(@"MQTTSessionManager connectTo:%@", host);
    BOOL shouldReconnect = self.session != nil;
    if (!self.session ||
        ![host isEqualToString:self.host] ||
        port != self.port ||
        tls != self.tls ||
        keepalive != self.keepalive ||
        clean != self.clean ||
        auth != self.auth ||
        ![user isEqualToString:self.user] ||
        ![pass isEqualToString:self.pass] ||
        ![willTopic isEqualToString:self.willTopic] ||
        ![willMsg isEqualToData:self.willMsg] ||
        willQos != self.willQos ||
        willRetainFlag != self.willRetainFlag ||
        ![clientId isEqualToString:self.clientId] ||
        securityPolicy != self.securityPolicy ||
        certificates != self.certificates) {
        self.host = host;
        self.port = (int)port;
        self.tls = tls;
        self.keepalive = keepalive;
        self.clean = clean;
        self.auth = auth;
        self.user = user;
        self.pass = pass;
        self.will = will;
        self.willTopic = willTopic;
        self.willMsg = willMsg;
        self.willQos = willQos;
        self.willRetainFlag = willRetainFlag;
        self.clientId = clientId;
        self.securityPolicy = securityPolicy;
        self.certificates = certificates;
        self.protocolLevel = protocolLevel;
        
        self.session = [[MQTTSession alloc] initWithClientId:clientId
                                                    userName:auth ? user : nil
                                                    password:auth ? pass : nil
                                                   keepAlive:keepalive
                                              connectMessage:nil
                                                cleanSession:clean
                                                        will:will
                                                   willTopic:willTopic
                                                     willMsg:willMsg
                                                     willQoS:willQos
                                              willRetainFlag:willRetainFlag
                                               protocolLevel:protocolLevel
                                                       queue:self.queue
                                              securityPolicy:securityPolicy
                                                certificates:certificates];
        self.session.streamSSLLevel = self.streamSSLLevel;
        MQTTCoreDataPersistence *persistence = [[MQTTCoreDataPersistence alloc] init];
        
        persistence.persistent = self.persistent;
        persistence.maxWindowSize = self.maxWindowSize;
        persistence.maxSize = self.maxSize;
        persistence.maxMessages = self.maxMessages;
        
        self.session.persistence = persistence;
        
        self.session.delegate = self;
        self.reconnectFlag = FALSE;
    }
    if (shouldReconnect) {
        DDLogVerbose(@"[MQTTSessionManager] reconnecting");
        [self disconnectWithDisconnectHandler:nil];
        [self reconnect:connectHandler];
    } else {
        DDLogVerbose(@"[MQTTSessionManager] connecting");
        [self connectToInternal:connectHandler];
    }
}

- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag {
    if (self.state != MQTTSessionManagerStateConnected) {
        [self connectToLast:nil];
    }
    UInt16 msgId = [self.session publishData:data
                                     onTopic:topic
                                      retain:retainFlag
                                         qos:qos];
    return msgId;
}

- (void)disconnectWithDisconnectHandler:(MQTTDisconnectHandler)disconnectHandler {
    [self updateState:MQTTSessionManagerStateClosing];
    [self.session closeWithDisconnectHandler:disconnectHandler];
    [self.reconnectTimer stop];
}

- (BOOL)requiresTearDown {
    return (self.state != MQTTSessionManagerStateClosed &&
            self.state != MQTTSessionManagerStateStarting);
}

- (void)updateState:(MQTTSessionManagerState)newState {
    self.state = newState;
    
    if ([self.delegate respondsToSelector:@selector(sessionManager:didChangeState:)]) {
        [self.delegate sessionManager:self didChangeState:newState];
    }
}




- (void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error {
#ifdef DEBUG
    __unused const NSDictionary *events = @{
                                            @(MQTTSessionEventConnected): @"connected",
                                            @(MQTTSessionEventConnectionRefused): @"connection refused",
                                            @(MQTTSessionEventConnectionClosed): @"connection closed",
                                            @(MQTTSessionEventConnectionError): @"connection error",
                                            @(MQTTSessionEventProtocolError): @"protocoll error",
                                            @(MQTTSessionEventConnectionClosedByBroker): @"connection closed by broker"
                                            };
    DDLogVerbose(@"[MQTTSessionManager] eventCode: %@ (%ld) %@", events[@(eventCode)], (long)eventCode, error);
#endif
    switch (eventCode) {
        case MQTTSessionEventConnected:
            self.lastErrorCode = nil;
            [self updateState:MQTTSessionManagerStateConnected];
            [self.reconnectTimer resetRetryInterval];
            break;
            
        case MQTTSessionEventConnectionClosed:
            [self updateState:MQTTSessionManagerStateClosed];
            break;
            
        case MQTTSessionEventConnectionClosedByBroker:
            if (self.state != MQTTSessionManagerStateClosing) {
                [self triggerDelayedReconnect];
            }
            [self updateState:MQTTSessionManagerStateClosed];
            break;
            
        case MQTTSessionEventProtocolError:
        case MQTTSessionEventConnectionRefused:
        case MQTTSessionEventConnectionError:
            [self triggerDelayedReconnect];
            self.lastErrorCode = error;
            [self updateState:MQTTSessionManagerStateError];
            break;
            
        default:
            break;
    }
}

- (void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid {
    if (self.delegate) {
        if ([self.delegate respondsToSelector:@selector(sessionManager:didReceiveMessage:onTopic:retained:)]) {
            [self.delegate sessionManager:self didReceiveMessage:data onTopic:topic retained:retained];
        }
        if ([self.delegate respondsToSelector:@selector(handleMessage:onTopic:retained:)]) {
            [self.delegate handleMessage:data onTopic:topic retained:retained];
        }
    }
}

- (void)connected:(MQTTSession *)session sessionPresent:(BOOL)sessionPresent {
    if (self.clean || !self.reconnectFlag || !sessionPresent) {
        NSDictionary *subscriptions = [self.internalSubscriptions copy];
        [self.subscriptionLock lock];
        self.effectiveSubscriptions = [[NSMutableDictionary alloc] init];
        [self.subscriptionLock unlock];
        if (subscriptions.count) {
            __weak MQTTSessionManager *weakSelf = self;
            [self.session subscribeToTopics:subscriptions subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
                MQTTSessionManager *strongSelf = weakSelf;
                if (!error) {
                    NSArray<NSString *> *allTopics = subscriptions.allKeys;
                    for (int i = 0; i < allTopics.count; i++) {
                        NSString *topic = allTopics[i];
                        NSNumber *gQos = gQoss[i];
                        [strongSelf.subscriptionLock lock];
                        NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy];
                        newEffectiveSubscriptions[topic] = gQos;
                        strongSelf.effectiveSubscriptions = newEffectiveSubscriptions;
                        [strongSelf.subscriptionLock unlock];
                    }
                }
            }];
            
        }
        self.reconnectFlag = TRUE;
    }
}

- (void)messageDelivered:(MQTTSession *)session msgID:(UInt16)msgID {
    if (self.delegate) {
        if ([self.delegate respondsToSelector:@selector(sessionManager:didDeliverMessage:)]) {
            [self.delegate sessionManager:self didDeliverMessage:msgID];
        }
        if ([self.delegate respondsToSelector:@selector(messageDelivered:)]) {
            [self.delegate messageDelivered:msgID];
        }
    }
}


- (void)connectToInternal:(MQTTConnectHandler)connectHandler {
    if (self.session && self.state == MQTTSessionManagerStateStarting) {
        [self updateState:MQTTSessionManagerStateConnecting];
        MQTTCFSocketTransport *transport;
        if (self.securityPolicy) {
            transport = [[MQTTSSLSecurityPolicyTransport alloc] init];
            ((MQTTSSLSecurityPolicyTransport *)transport).securityPolicy = self.securityPolicy;
        } else {
            transport = [[MQTTCFSocketTransport alloc] init];
        }
        transport.host = self.host;
        transport.port = self.port;
        transport.tls = self.tls;
        transport.certificates = self.certificates;
        transport.voip = self.session.voip;
        transport.queue = self.queue;
        transport.streamSSLLevel = self.streamSSLLevel;
        self.session.transport = transport;
        self.session.keepAliveInterval = self.keepalive;
        [self.session connectWithConnectHandler:connectHandler];
    }
}

- (void)reconnect:(MQTTConnectHandler)connectHandler {
    [self updateState:MQTTSessionManagerStateStarting];
    [self connectToInternal:connectHandler];
}

- (void)connectToLast:(MQTTConnectHandler)connectHandler {
    if (self.state == MQTTSessionManagerStateConnected) {
        return;
    }
    [self.reconnectTimer resetRetryInterval];
    [self reconnect:connectHandler];
}

- (void)updateSessionConfig:(NSString *)host
                      port:(NSInteger)port
                      user:(NSString *)user
                      pass:(NSString *)pass
                  clientId:(NSString *)clientId
                  keepalive:(NSInteger)keepalive {
    self.host = host;
    self.port = (int)port;
    self.session.userName = user;
    self.session.password = pass;
    self.session.clientId = clientId;
    self.session.keepAliveInterval = keepalive;
    [self reconnect:nil];
}

- (void)triggerDelayedReconnect {
    [self.reconnectTimer schedule];
}

- (NSDictionary<NSString *, NSNumber *> *)subscriptions {
    return self.internalSubscriptions;
}

- (void)setSubscriptions:(NSDictionary<NSString *, NSNumber *> *)newSubscriptions {
    if (self.state == MQTTSessionManagerStateConnected) {
        NSDictionary *currentSubscriptions = [self.effectiveSubscriptions copy];
        
        for (NSString *topicFilter in currentSubscriptions) {
            if (!newSubscriptions[topicFilter]) {
                __weak MQTTSessionManager *weakSelf = self;
                [self.session unsubscribeTopic:topicFilter unsubscribeHandler:^(NSError *error) {
                    MQTTSessionManager *strongSelf = weakSelf;
                    if (!error) {
                        [strongSelf.subscriptionLock lock];
                        NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy];
                        [newEffectiveSubscriptions removeObjectForKey:topicFilter];
                        strongSelf.effectiveSubscriptions = newEffectiveSubscriptions;
                        [strongSelf.subscriptionLock unlock];
                    }
                }];
            }
        }
        
        for (NSString *topicFilter in newSubscriptions) {
            if (!currentSubscriptions[topicFilter]) {
                NSNumber *number = newSubscriptions[topicFilter];
                MQTTQosLevel qos = number.unsignedIntValue;
                __weak MQTTSessionManager *weakSelf = self;
                [self.session subscribeToTopic:topicFilter atLevel:qos subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
                    MQTTSessionManager *strongSelf = weakSelf;
                    if (!error) {
                        NSNumber *gQos = gQoss[0];
                        [strongSelf.subscriptionLock lock];
                        NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy];
                        newEffectiveSubscriptions[topicFilter] = gQos;
                        strongSelf.effectiveSubscriptions = newEffectiveSubscriptions;
                        [strongSelf.subscriptionLock unlock];
                    }
                }];
            }
        }
    }
    self.internalSubscriptions = newSubscriptions;
    DDLogVerbose(@"MQTTSessionManager internalSubscriptions: %@", self.internalSubscriptions);
}

@end
