amqp-client源码解析1:数据格式
1. Frame
public class Frame {
public final int type;
public final int channel;
private final byte[] payload;
private final ByteArrayOutputStream accumulator;
private static final int NON_BODY_SIZE = 1 + 2 + 4 + 1 ;
public Frame(int type, int channel) {
this.type = type;
this.channel = channel;
this.payload = null;
this.accumulator = new ByteArrayOutputStream();
}
public Frame(int type, int channel, byte[] payload) {
this.type = type;
this.channel = channel;
this.payload = payload;
this.accumulator = null;
}
public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset, int length) throws IOException {
Frame frame = new Frame(AMQP.FRAME_BODY, channelNumber);
DataOutputStream bodyOut = frame.getOutputStream();
bodyOut.write(body, offset, length);
return frame;
}
public static Frame readFrom(DataInputStream is) throws IOException {
int type;
int channel;
try {
type = is.readUnsignedByte();
} catch (SocketTimeoutException ste) {
return null;
}
if (type == 'A') {
protocolVersionMismatch(is);
}
channel = is.readUnsignedShort();
int payloadSize = is.readInt();
byte[] payload = new byte[payloadSize];
is.readFully(payload);
int frameEndMarker = is.readUnsignedByte();
if (frameEndMarker != AMQP.FRAME_END) {
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
}
return new Frame(type, channel, payload);
}
public void writeTo(DataOutputStream os) throws IOException {
os.writeByte(type);
os.writeShort(channel);
if (accumulator != null) {
os.writeInt(accumulator.size());
accumulator.writeTo(os);
} else {
os.writeInt(payload.length);
os.write(payload);
}
os.write(AMQP.FRAME_END);
}
public int size() {
if (accumulator != null) {
return accumulator.size() + NON_BODY_SIZE;
} else {
return payload.length + NON_BODY_SIZE;
}
}
public byte[] getPayload() {
if (payload != null) return payload;
return accumulator.toByteArray();
}
public DataInputStream getInputStream() {
return new DataInputStream(new ByteArrayInputStream(getPayload()));
}
public DataOutputStream getOutputStream() {
return new DataOutputStream(accumulator);
}
}
2. Method
2.1. Method接口
public interface Method {
int protocolClassId();
int protocolMethodId();
String protocolMethodName();
}
2.2. Method抽象类
public abstract class Method implements com.rabbitmq.client.Method {
public abstract boolean hasContent();
public abstract Object visit(MethodVisitor visitor) throws IOException;
public abstract void writeArgumentsTo(MethodArgumentWriter writer) throws IOException;
public Frame toFrame(int channelNumber) throws IOException {
Frame frame = new Frame(AMQP.FRAME_METHOD, channelNumber);
DataOutputStream bodyOut = frame.getOutputStream();
bodyOut.writeShort(protocolClassId());
bodyOut.writeShort(protocolMethodId());
MethodArgumentWriter argWriter = new MethodArgumentWriter(new ValueWriter(bodyOut));
writeArgumentsTo(argWriter);
argWriter.flush();
return frame;
}
}
2.3. Ack
public static class Ack extends Method implements com.rabbitmq.client.AMQP.Basic.Ack {
public static final int INDEX = 80;
private final long deliveryTag;
private final boolean multiple;
public Ack(MethodArgumentReader rdr) throws IOException { this(rdr.readLonglong(), rdr.readBit()); }
public int protocolClassId() { return 60; }
public int protocolMethodId() { return 80; }
public String protocolMethodName() { return "basic.ack"; }
public boolean hasContent() { return false; }
public Object visit(MethodVisitor visitor) throws IOException { return visitor.visit(this); }
public void writeArgumentsTo(MethodArgumentWriter writer) throws IOException {
writer.writeLonglong(this.deliveryTag);
writer.writeBit(this.multiple);
}
}
3. ContentHeader
3.1. ContentHeader
public interface ContentHeader extends Cloneable {
int getClassId();
String getClassName();
void appendPropertyDebugStringTo(StringBuilder buffer);
}
3.2. AMQContentHeader
public abstract class AMQContentHeader implements ContentHeader {
private long bodySize;
protected AMQContentHeader() {
this.bodySize = 0;
}
protected AMQContentHeader(DataInputStream in) throws IOException {
in.readShort();
this.bodySize = in.readLong();
}
private void writeTo(DataOutputStream out, long bodySize) throws IOException {
out.writeShort(0);
out.writeLong(bodySize);
writePropertiesTo(new ContentHeaderPropertyWriter(out));
}
public abstract void writePropertiesTo(ContentHeaderPropertyWriter writer) throws IOException;
public Frame toFrame(int channelNumber, long bodySize) throws IOException {
Frame frame = new Frame(AMQP.FRAME_HEADER, channelNumber);
DataOutputStream bodyOut = frame.getOutputStream();
bodyOut.writeShort(getClassId());
writeTo(bodyOut, bodySize);
return frame;
}
}
4. Command
4.1. Command
public interface Command {
Method getMethod();
ContentHeader getContentHeader();
byte[] getContentBody();
}
4.2. CommandAssembler
final class CommandAssembler {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private enum CAState {
EXPECTING_METHOD,
EXPECTING_CONTENT_HEADER,
EXPECTING_CONTENT_BODY,
COMPLETE
}
private CAState state;
private Method method;
private AMQContentHeader contentHeader;
private final List<byte[]> bodyN;
private int bodyLength;
private long remainingBodyBytes;
private void appendBodyFragment(byte[] fragment) {
if (fragment == null || fragment.length == 0) return;
bodyN.add(fragment);
bodyLength += fragment.length;
}
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
this.method = method;
this.contentHeader = contentHeader;
this.bodyN = new ArrayList<byte[]>(2);
this.bodyLength = 0;
this.remainingBodyBytes = 0;
appendBodyFragment(body);
if (method == null) {
this.state = CAState.EXPECTING_METHOD;
} else if (contentHeader == null) {
this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
} else {
this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength;
updateContentBodyState();
}
}
private void updateContentBodyState() {
this.state = (this.remainingBodyBytes > 0) ? CAState.EXPECTING_CONTENT_BODY : CAState.COMPLETE;
}
public synchronized boolean handleFrame(Frame f) throws IOException {
switch (this.state) {
case EXPECTING_METHOD: consumeMethodFrame(f); break;
case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;
case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;
default: throw new IllegalStateException("Bad Command State " + this.state);
}
return isComplete();
}
private void consumeMethodFrame(Frame f) throws IOException {
if (f.type == AMQP.FRAME_METHOD) {
this.method = AMQImpl.readMethodFrom(f.getInputStream());
this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);
}
}
private void consumeHeaderFrame(Frame f) throws IOException {
if (f.type == AMQP.FRAME_HEADER) {
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
this.remainingBodyBytes = this.contentHeader.getBodySize();
updateContentBodyState();
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
}
}
private void consumeBodyFrame(Frame f) {
if (f.type == AMQP.FRAME_BODY) {
byte[] fragment = f.getPayload();
this.remainingBodyBytes -= fragment.length;
updateContentBodyState();
if (this.remainingBodyBytes < 0) {
throw new UnsupportedOperationException("%%%%%% FIXME unimplemented");
}
appendBodyFragment(fragment);
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_BODY);
}
}
private byte[] coalesceContentBody() {
if (this.bodyLength == 0) return EMPTY_BYTE_ARRAY;
if (this.bodyN.size() == 1) return this.bodyN.get(0);
byte[] body = new byte[bodyLength];
int offset = 0;
for (byte[] fragment : this.bodyN) {
System.arraycopy(fragment, 0, body, offset, fragment.length);
offset += fragment.length;
}
this.bodyN.clear();
this.bodyN.add(body);
return body;
}
}
4.3. AMQCommand
public class AMQCommand implements Command {
public static final int EMPTY_FRAME_SIZE = 8;
private final CommandAssembler assembler;
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
}
@Override
public Method getMethod() {
return this.assembler.getMethod();
}
@Override
public AMQContentHeader getContentHeader() {
return this.assembler.getContentHeader();
}
@Override
public byte[] getContentBody() {
return this.assembler.getContentBody();
}
public boolean handleFrame(Frame f) throws IOException {
return this.assembler.handleFrame(f);
}
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
}