diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c index 0982775135..378328de06 100644 --- a/libavformat/rtmppkt.c +++ b/libavformat/rtmppkt.c @@ -140,16 +140,17 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p, return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr); } -int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, - RTMPPacket *prev_pkt, uint8_t hdr) +static int rtmp_packet_read_one_chunk(URLContext *h, RTMPPacket *p, + int chunk_size, RTMPPacket *prev_pkt, + uint8_t hdr) { - uint8_t t, buf[16]; - int channel_id, timestamp, size, offset = 0; + uint8_t buf[16]; + int channel_id, timestamp, size; uint32_t extra = 0; enum RTMPPacketType type; int written = 0; - int ret; + int ret, toread; written++; channel_id = hdr & 0x3F; @@ -198,37 +199,69 @@ int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, if (hdr != RTMP_PS_TWELVEBYTES) timestamp += prev_pkt[channel_id].timestamp; - if ((ret = ff_rtmp_packet_create(p, channel_id, type, timestamp, - size)) < 0) - return ret; + if (!prev_pkt[channel_id].read) { + if ((ret = ff_rtmp_packet_create(p, channel_id, type, timestamp, + size)) < 0) + return ret; + p->read = written; + p->offset = 0; + prev_pkt[channel_id].ts_delta = timestamp - + prev_pkt[channel_id].timestamp; + prev_pkt[channel_id].timestamp = timestamp; + } else { + // previous packet in this channel hasn't completed reading + RTMPPacket *prev = &prev_pkt[channel_id]; + p->data = prev->data; + p->size = prev->size; + p->channel_id = prev->channel_id; + p->type = prev->type; + p->ts_delta = prev->ts_delta; + p->extra = prev->extra; + p->offset = prev->offset; + p->read = prev->read + written; + p->timestamp = prev->timestamp; + prev->data = NULL; + } p->extra = extra; // save history prev_pkt[channel_id].channel_id = channel_id; prev_pkt[channel_id].type = type; prev_pkt[channel_id].size = size; - prev_pkt[channel_id].ts_delta = timestamp - prev_pkt[channel_id].timestamp; - prev_pkt[channel_id].timestamp = timestamp; prev_pkt[channel_id].extra = extra; - while (size > 0) { - int toread = FFMIN(size, chunk_size); - if (ffurl_read_complete(h, p->data + offset, toread) != toread) { - ff_rtmp_packet_destroy(p); - return AVERROR(EIO); - } - size -= chunk_size; - offset += chunk_size; - written += chunk_size; - if (size > 0) { - if ((ret = ffurl_read_complete(h, &t, 1)) < 0) { // marker - ff_rtmp_packet_destroy(p); - return ret; - } - written++; - if (t != (0xC0 + channel_id)) - return -1; - } + size = size - p->offset; + + toread = FFMIN(size, chunk_size); + if (ffurl_read_complete(h, p->data + p->offset, toread) != toread) { + ff_rtmp_packet_destroy(p); + return AVERROR(EIO); + } + size -= toread; + p->read += toread; + p->offset += toread; + + if (size > 0) { + RTMPPacket *prev = &prev_pkt[channel_id]; + prev->data = p->data; + prev->read = p->read; + prev->offset = p->offset; + return AVERROR(EAGAIN); + } + + prev_pkt[channel_id].read = 0; // read complete; reset if needed + return p->read; +} + +int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size, + RTMPPacket *prev_pkt, uint8_t hdr) +{ + while (1) { + int ret = rtmp_packet_read_one_chunk(h, p, chunk_size, prev_pkt, hdr); + if (ret > 0 || ret != AVERROR(EAGAIN)) + return ret; + + if (ffurl_read(h, &hdr, 1) != 1) + return AVERROR(EIO); } - return written; } int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt, diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h index 96ca1b68e8..e9c7ea7e65 100644 --- a/libavformat/rtmppkt.h +++ b/libavformat/rtmppkt.h @@ -82,6 +82,8 @@ typedef struct RTMPPacket { uint32_t extra; ///< probably an additional channel ID used during streaming data uint8_t *data; ///< packet payload int size; ///< packet payload size + int offset; ///< amount of data read so far + int read; ///< amount read, including headers } RTMPPacket; /** diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c index edc433fd2a..e8aecfef67 100644 --- a/libavformat/rtmpproto.c +++ b/libavformat/rtmpproto.c @@ -2306,7 +2306,7 @@ static int get_packet(URLContext *s, int for_header) static int rtmp_close(URLContext *h) { RTMPContext *rt = h->priv_data; - int ret = 0; + int ret = 0, i, j; if (!rt->is_input) { rt->flv_data = NULL; @@ -2317,6 +2317,9 @@ static int rtmp_close(URLContext *h) } if (rt->state > STATE_HANDSHAKED) ret = gen_delete_stream(h, rt); + for (i = 0; i < 2; i++) + for (j = 0; j < RTMP_CHANNELS; j++) + ff_rtmp_packet_destroy(&rt->prev_pkt[i][j]); free_tracked_methods(rt); av_freep(&rt->flv_data);