tcpcl.test package

Submodules

A dummy bundle data generator.

class tcpcl.test.bundlegen.TestBundleGen(methodName='runTest')

Bases: TestCase

testCrc16Itu()
testCrc32C()
tcpcl.test.bundlegen.binaryCborTag(item)

Encode CBOR as bytestring and tag the item.

Parameters:

item – The CBOR item to encode.

Returns:

The binary-enveloped item.

tcpcl.test.bundlegen.binaryCborseqTag(items)

Encode CBOR sequence as bytestring and tag the item.

Parameters:

items – The CBOR items to encode.

Returns:

The binary-enveloped item.

class tcpcl.test.bundlegen.Block(fields, crc_type_ix=None, crc_field_ix=None)

Bases: object

Represent an abstract block with CRC fields.

update_crc()
tcpcl.test.bundlegen.randtext(sizemax=100)
tcpcl.test.bundlegen.randbytes(sizemax=100)
tcpcl.test.bundlegen.randdtntime()

Generate a random DTN time. 20% of the time this will be the invalid/unknown time value.

tcpcl.test.bundlegen.randnodeid()

Generate a random Node ID. 50% of the time this will be a DTN URI. 50% of the time this will be an IPN URI.

tcpcl.test.bundlegen.randtimestamp()

Generate a random timestamp tuple.

tcpcl.test.bundlegen.randstatus()

Generate a random Bundle Status Report information tuple. 50% of the time this will include a time.

tcpcl.test.bundlegen.randcboritem(maxdepth=10)

Generate an arbitrary random CBOR data structure.

class tcpcl.test.bundlegen.Generator

Bases: object

A ‘bundle’ data generator.

BLOCK_NUM_PAYLOAD = 1
BLOCK_TYPE_PAYLOAD = 1
BLOCK_TYPE_BIB = 11
BLOCK_TYPE_BCB = 12
class BlockType(*values)

Bases: IntEnum

Non-primary block types.

PREV_NODE = 6
BUNDLE_AGE = 7
HOP_COUNT = 10
create_block_data(block_type, block_flags, bundle_flags)

Block-type-specific data gerator.

create_block_random(block_type, bundle_flags, unused_blocknum)
create_invalid_random()

Generate a purely random data.

Returns:

A single bundle file.

Return type:

file-like

create_invalid_cbor()

Generate a valid-CBOR content which is not really a bundle.

Returns:

A single bundle file.

Return type:

file-like

create_valid()

Generate a random, but structurally valid, encoded bundle.

Returns:

A single bundle file.

Return type:

file-like

tcpcl.test.bundlegen.bundle_iterable(genmode, gencount, indata)

A generator to yield encoded bundles as file-like objects.

tcpcl.test.bundlegen.agent_send_bundles(agent, contact, iterable)

A glib callback to send a sequence of bundles and then shutdown the agent.

Parameters:

iterable – An iterable object which produces file-like bundles.

tcpcl.test.bundlegen.main()

A dummy PCAP generator to exercise wireshark dissector.

class tcpcl.test.messagegen.Worker(args)

Bases: object

CHUNK_SIZE = 102400
AVAIL_MSGS = [<class 'tcpcl.messages.SessionInit'>, <class 'tcpcl.messages.SessionTerm'>, <class 'tcpcl.messages.TransferSegment'>, <class 'tcpcl.messages.TransferAck'>, <class 'tcpcl.messages.TransferRefuse'>, <class 'tcpcl.messages.Keepalive'>, <class 'tcpcl.messages.RejectMsg'>]
set_on_stop(func)

Set a callback to be run when this agent is stopped.

Parameters:

func – The callback, which takes no arguments.

start()
stop()
tcpcl.test.messagegen.main()
class tcpcl.test.test_contact.TestContact(methodName='runTest')

Bases: TestCase

testSerialize()
testDeserialize()
class tcpcl.test.test_formats.TestUInt16Field(methodName='runTest')

Bases: TestCase

Verify UInt16Field class

class DummyPacket(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: Packet

fields_desc: List[AnyField] = [<UInt16Field (DummyPacket).attr>]
aliastypes = [<class 'tcpcl.test.test_formats.TestUInt16Field.DummyPacket'>, <class 'scapy.packet.Packet'>]
testSerialize()
testDeserialize()
class tcpcl.test.test_formats.TestSdnvField(methodName='runTest')

Bases: TestCase

Verify SdnvField class

class DummyPacket(_pkt, /, *, attr=None)

Bases: Packet

fields_desc: List[AnyField] = [<SdnvField (DummyPacket).attr>]
aliastypes = [<class 'tcpcl.test.test_formats.TestSdnvField.DummyPacket'>, <class 'scapy.packet.Packet'>]
testSerialize()
testDeserialize()
class tcpcl.test.test_messages.TestSessionInit(methodName='runTest')

Bases: TestCase

testSerializeDefault()
testSerializeNoExt()
testDeserializeNoExt()
testSerializeEmptyExt()
testDeserializeEmptyExt()
class tcpcl.test.test_messages.TestSessionTerm(methodName='runTest')

Bases: TestCase

testSerializeNoReason()
testDeserializeNoReason()
testSerializeWithReason()
testDeserializeWithReason()
testSerializeAck()
testDeserializeAck()
class tcpcl.test.test_messages.TestKeepalive(methodName='runTest')

Bases: TestCase

testSerialize()
testDeserialize()
class tcpcl.test.test_messages.TestRejectMsg(methodName='runTest')

Bases: TestCase

testSerialize()
testDeserialize()
class tcpcl.test.test_messages.TestTransferRefuse(methodName='runTest')

Bases: TestCase

testSerialize()
testDeserialize()
class tcpcl.test.test_messages.TestTransferSegment(methodName='runTest')

Bases: TestCase

testSerializeStartNoExt()
testDeserializeStartNoExt()
testSerializeStartLengthExt()
testDeserializeStartLengthExt()
testSerializeMidData()
testDeserializeMidData()
testSerializeEndData()
testDeserializeEndData()
class tcpcl.test.test_messages.TestTransferAck(methodName='runTest')

Bases: TestCase

testSerialize()
testDeserialize()