






#import "MQTTDecoder.h"

#import "MQTTLog.h"

@interface MQTTDecoder() {
    void *QueueIdentityKey;
}

@property (nonatomic) NSMutableArray<NSInputStream *> *streams;

@end

@implementation MQTTDecoder

- (instancetype)init {
    self = [super init];
    self.state = MQTTDecoderStateInitializing;
    self.streams = [NSMutableArray arrayWithCapacity:5];
    self.queue = dispatch_get_main_queue();
    return self;
}

- (void)dealloc {
    [self close];
}

- (void)setQueue:(dispatch_queue_t)queue {
    _queue = queue;
    
    
    
    
    
    
    
    
    
    
    
    
    dispatch_queue_set_specific(_queue, &QueueIdentityKey, (__bridge void *)_queue, NULL);
}

- (void)decodeMessage:(NSData *)data {
    NSInputStream *stream = [NSInputStream inputStreamWithData:data];
    CFReadStreamRef readStream = (__bridge CFReadStreamRef)stream;
    CFReadStreamSetDispatchQueue(readStream, self.queue);
    [self openStream:stream];
}

- (void)openStream:(NSInputStream *)stream {
    [self.streams addObject:stream];
    stream.delegate = self;
    DDLogVerbose(@"[MQTTDecoder] #streams=%lu", (unsigned long)self.streams.count);
    if (self.streams.count == 1) {
        [stream open];
    }
}

- (void)open {
    self.state = MQTTDecoderStateDecodingHeader;
}

- (void)internalClose {
    if (self.streams) {
        for (NSInputStream *stream in self.streams) {
            [stream close];
            [stream setDelegate:nil];
        }
        [self.streams removeAllObjects];
    }
}

- (void)close {
    
    
    
    
    if (self.queue != dispatch_get_specific(&QueueIdentityKey)) {
        dispatch_sync(self.queue, ^{
            [self internalClose];
        });
    } else {
        [self internalClose];
    }
}

- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
    
    
    
    
    MQTTDecoder *strongDecoder = self;
    (void)strongDecoder;
    
    NSInputStream *stream = (NSInputStream *)sender;
    
    if (eventCode & NSStreamEventOpenCompleted) {
        DDLogVerbose(@"[MQTTDecoder] NSStreamEventOpenCompleted");
    }
    
    if (eventCode & NSStreamEventHasBytesAvailable) {
        DDLogVerbose(@"[MQTTDecoder] NSStreamEventHasBytesAvailable");
        
        if (self.state == MQTTDecoderStateDecodingHeader) {
            UInt8 buffer;
            NSInteger n = [stream read:&buffer maxLength:1];
            if (n == -1) {
                self.state = MQTTDecoderStateConnectionError;
                [self.delegate decoder:self handleEvent:MQTTDecoderEventConnectionError error:stream.streamError];
            } else if (n == 1) {
                self.length = 0;
                self.lengthMultiplier = 1;
                self.state = MQTTDecoderStateDecodingLength;
                self.dataBuffer = [[NSMutableData alloc] init];
                [self.dataBuffer appendBytes:&buffer length:1];
                self.offset = 1;
                DDLogVerbose(@"[MQTTDecoder] fixedHeader=0x%02x", buffer);
            }
        }
        while (self.state == MQTTDecoderStateDecodingLength) {
            
            UInt8 digit;
            NSInteger n = [stream read:&digit maxLength:1];
            if (n == -1) {
                self.state = MQTTDecoderStateConnectionError;
                [self.delegate decoder:self handleEvent:MQTTDecoderEventConnectionError error:stream.streamError];
                break;
            } else if (n == 0) {
                break;
            }
            DDLogVerbose(@"[MQTTDecoder] digit=0x%02x 0x%02x %d %d", digit, digit & 0x7f, (unsigned int)self.length, (unsigned int)self.lengthMultiplier);
            [self.dataBuffer appendBytes:&digit length:1];
            self.offset++;
            self.length += ((digit & 0x7f) * self.lengthMultiplier);
            if ((digit & 0x80) == 0x00) {
                self.state = MQTTDecoderStateDecodingData;
            } else {
                self.lengthMultiplier *= 128;
            }
        }
        DDLogVerbose(@"[MQTTDecoder] remainingLength=%d", (unsigned int)self.length);

        if (self.state == MQTTDecoderStateDecodingData) {
            if (self.length > 0) {
                NSInteger n, toRead;
                UInt8 buffer[768];
                toRead = self.length + self.offset - self.dataBuffer.length;
                if (toRead > sizeof buffer) {
                    toRead = sizeof buffer;
                }
                n = [stream read:buffer maxLength:toRead];
                if (n == -1) {
                    self.state = MQTTDecoderStateConnectionError;
                    [self.delegate decoder:self handleEvent:MQTTDecoderEventConnectionError error:stream.streamError];
                } else {
                    DDLogVerbose(@"[MQTTDecoder] read %ld %ld", (long)toRead, (long)n);
                    [self.dataBuffer appendBytes:buffer length:n];
                }
            }
            if (self.dataBuffer.length == self.length + self.offset) {
                DDLogVerbose(@"[MQTTDecoder] received (%lu)=%@...", (unsigned long)self.dataBuffer.length,
                                    [self.dataBuffer subdataWithRange:NSMakeRange(0, MIN(256, self.dataBuffer.length))]);
                [self.delegate decoder:self didReceiveMessage:self.dataBuffer];
                self.dataBuffer = nil;
                self.state = MQTTDecoderStateDecodingHeader;
            } else {
                DDLogWarn(@"[MQTTDecoder] oops received (%lu)=%@...", (unsigned long)self.dataBuffer.length,
                             [self.dataBuffer subdataWithRange:NSMakeRange(0, MIN(256, self.dataBuffer.length))]);
            }
        }
    }
    
    if (eventCode & NSStreamEventHasSpaceAvailable) {
        DDLogVerbose(@"[MQTTDecoder] NSStreamEventHasSpaceAvailable");
    }
    
    if (eventCode & NSStreamEventEndEncountered) {
        DDLogVerbose(@"[MQTTDecoder] NSStreamEventEndEncountered");
        
        if (self.streams) {
            [stream setDelegate:nil];
            [stream close];
            [self.streams removeObject:stream];
            if (self.streams.count) {
                NSInputStream *stream = (self.streams)[0];
                [stream open];
            }
        }
    }
    
    if (eventCode & NSStreamEventErrorOccurred) {
        DDLogVerbose(@"[MQTTDecoder] NSStreamEventErrorOccurred");
        
        self.state = MQTTDecoderStateConnectionError;
        NSError *error = stream.streamError;
        if (self.streams) {
            [self.streams removeObject:stream];
            if (self.streams.count) {
                NSInputStream *stream = (self.streams)[0];
                [stream open];
            }
        }
        [self.delegate decoder:self handleEvent:MQTTDecoderEventConnectionError error:error];
    }
}

@end
