bp.encoding package

Bundle encoding details.

Submodules

Administrative records and types.

class bp.encoding.admin.AdminRecord(_pkt, /, *, type_code=None)

Bases: TypeValueHead

An administrative record bundle payload of BPbis Section 6.1. This is handled specially because it needs a primary block flag to indicate its presence.

aliastypes = [<class 'bp.encoding.admin.AdminRecord'>, <class 'scapy_cbor.packets.TypeValueHead'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
fields_desc: List[AnyField] = [<UintField (TypeValueHead,AdminRecord).type_code>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({'type_code': 1}, <class 'bp.encoding.admin.StatusReport'>)]
class bp.encoding.admin.StatusInfo(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

Each Status assertion of BPbis Section 6.1.1.

fields_desc: List[AnyField] = [<BoolField (StatusInfo).status>, <scapy_cbor.fields.OptionalField object>]
aliastypes = [<class 'bp.encoding.admin.StatusInfo'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.admin.StatusInfoArray(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

The Status assertions of BPbis Section 6.1.1.

fields_desc: List[AnyField] = [<PacketField (StatusInfoArray).received>, <PacketField (StatusInfoArray).forwarded>, <PacketField (StatusInfoArray).delivered>, <PacketField (StatusInfoArray).deleted>]
aliastypes = [<class 'bp.encoding.admin.StatusInfoArray'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.admin.StatusReport(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

The Status Report of BPbis Section 6.1.1.

class ReasonCode(*values)

Bases: IntEnum

NO_INFO = 0
LIFETIME_EXP = 1
FWD_UNI = 2
TX_CANCEL = 3
DEPLETE_STORAGE = 4
DEST_EID_UNINTEL = 5
NO_ROUTE = 6
NO_NEXT_CONTACT = 7
BLOCK_UNINTEL = 8
HOP_LIMIT_EXC = 9
TRAFIC_PAIRED = 10
MISSING_SEC = 12

Missing security operation

UNKNOWN_SEC = 13

Unknown Security Operation

UNEXPECT_SEC = 14

Unexpected security operation

FAILED_SEC = 15

Failed Security Operation

CONFLICT_SEC = 16

Conflicting security operation

aliastypes = [<class 'bp.encoding.admin.StatusReport'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
fields_desc: List[AnyField] = [<PacketField (StatusReport).status>, <EnumField (StatusReport).reason_code>, <EidField (StatusReport).subj_source>, <PacketField (StatusReport).subj_ts>, <scapy_cbor.fields.OptionalField object>, <scapy_cbor.fields.OptionalField object>]

Base block and bundle encoding.

class bp.encoding.blocks.Timestamp(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

A structured representation of an DTN Timestamp. The timestamp is a two-tuple of (time, sequence number) The creation time portion is automatically converted from a datetime.datetime object and text.

fields_desc: List[AnyField] = [<DtnTimeField (Timestamp).dtntime>, <UintField (Timestamp).seqno>]
aliastypes = [<class 'bp.encoding.blocks.Timestamp'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.blocks.AbstractBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

Represent an abstract block with CRC fields.

class CrcType(*values)

Bases: IntEnum

CRC type values.

NONE = 0
CRC16 = 1
CRC32 = 2
CRC_DEFN = {CrcType.CRC16: {'encode': <function AbstractBlock.<lambda>>, 'func': <function _mkCrcFun.<locals>.crcfun>}, CrcType.CRC32: {'encode': <function AbstractBlock.<lambda>>, 'func': <function _mkCrcFun.<locals>.crcfun>}}
crc_type_name = 'crc_type'

The name of the CRC-type field.

crc_value_name = 'crc_value'

The name of the CRC-value field.

fill_fields()

Fill all fields so that the block is the full size it needs to be for encoding encoding with build(). Derived classes should populate their block-type-specific-data also.

update_crc(keep_existing=False)

Update this block’s CRC field from the current field data only if the current CRC (field not default) value is None.

check_crc()

Check the current CRC value, if enabled. :return: True if the CRC is disabled or it is valid.

aliastypes = [<class 'bp.encoding.blocks.AbstractBlock'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.blocks.PrimaryBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: AbstractBlock

The primary block definition

class Flag(*values)

Bases: IntFlag

Bundle flags.

NONE = 0
REQ_DELETION_REPORT = 262144
REQ_DELIVERY_REPORT = 131072
REQ_FORWARDING_REPORT = 65536
REQ_RECEPTION_REPORT = 16384
REQ_STATUS_TIME = 64
USER_APP_ACK = 32
NO_FRAGMENT = 4
PAYLOAD_ADMIN = 2
IS_FRAGMENT = 1
fields_desc: List[AnyField] = [<UintField (PrimaryBlock).bp_version>, <FlagsField (PrimaryBlock).bundle_flags>, <EnumField (PrimaryBlock).crc_type>, <EidField (PrimaryBlock).destination>, <EidField (PrimaryBlock).source>, <EidField (PrimaryBlock).report_to>, <PacketField (PrimaryBlock).create_ts>, <UintField (PrimaryBlock).lifetime>, <scapy.fields.ConditionalField object>, <scapy.fields.ConditionalField object>, <scapy.fields.ConditionalField object>]
aliastypes = [<class 'bp.encoding.blocks.PrimaryBlock'>, <class 'bp.encoding.blocks.AbstractBlock'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.blocks.CanonicalBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: AbstractBlock

The canonical block definition with a type-specific payload.

Any payload of this block is encoded as the “data” field when building and decoded from the “data” field when dissecting.

class Flag(*values)

Bases: IntFlag

Block flags. Flags must be in LSbit-first order.

NONE = 0
REMOVE_IF_NO_PROCESS = 16
DELETE_IF_NO_PROCESS = 4
STATUS_IF_NO_PROCESS = 2
REPLICATE_IN_FRAGMENT = 1
fields_desc: List[AnyField] = [<UintField (CanonicalBlock).type_code>, <UintField (CanonicalBlock).block_num>, <FlagsField (CanonicalBlock).block_flags>, <EnumField (CanonicalBlock).crc_type>, <BstrField (CanonicalBlock).btsd>, <scapy.fields.ConditionalField object>]
ensure_block_type_specific_data()

Embed payload as field data if not already present.

fill_fields()

Fill all fields so that the block is the full size it needs to be for encoding encoding with build(). Derived classes should populate their block-type-specific-data also.

self_build(*args, **kwargs)

Create the default layer regarding fields_desc dict

do_build_payload()

Create the default version of the payload layer

Returns:

a string of payload layer

post_dissect(s)

DEV: is called right after the current layer has been dissected

default_payload_class(payload)

DEV: Returns the default payload class if nothing has been found by the guess_payload_class() method.

Parameters:

payload (str) – the layer’s payload

Returns:

the default payload class define inside the configuration file

classmethod bind_type(type_code)

Bind a block-type-specific packet-class handler.

Parameters:

type_code (int) – The type to bind to the payload class.

aliastypes = [<class 'bp.encoding.blocks.CanonicalBlock'>, <class 'bp.encoding.blocks.AbstractBlock'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({'type_code': 6}, <class 'bp.encoding.blocks.PreviousNodeBlock'>), ({'type_code': 7}, <class 'bp.encoding.blocks.BundleAgeBlock'>), ({'type_code': 10}, <class 'bp.encoding.blocks.HopCountBlock'>), ({'type_code': 11}, <class 'bp.encoding.bpsec.BlockIntegrityBlock'>), ({'type_code': 12}, <class 'bp.encoding.bpsec.BlockConfidentialityBlock'>)]
class bp.encoding.blocks.PreviousNodeBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborItem

Block data from BPbis Section 4.3.1.

fields_desc: List[AnyField] = [<EidField (PreviousNodeBlock).node>]
aliastypes = [<class 'bp.encoding.blocks.PreviousNodeBlock'>, <class 'scapy_cbor.packets.CborItem'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.blocks.BundleAgeBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborItem

Block data from BPbis Section 4.3.2.

fields_desc: List[AnyField] = [<UintField (BundleAgeBlock).age>]
aliastypes = [<class 'bp.encoding.blocks.BundleAgeBlock'>, <class 'scapy_cbor.packets.CborItem'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.blocks.HopCountBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

Block data from BPbis Section 4.3.3.

aliastypes = [<class 'bp.encoding.blocks.HopCountBlock'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
fields_desc: List[AnyField] = [<UintField (HopCountBlock).limit>, <UintField (HopCountBlock).count>]

Blocks for BPSEC.

class bp.encoding.bpsec.TypeValuePair(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

A pattern for an array encoding which contains exactly two values.

fields_desc: List[AnyField] = [<UintField (TypeValuePair).type_code>, <CborField (TypeValuePair).value>]
aliastypes = [<class 'bp.encoding.bpsec.TypeValuePair'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.bpsec.TargetResultList(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

A list of results for a single target.

fields_desc: List[AnyField] = [<PacketListField (TargetResultList).results>]
aliastypes = [<class 'bp.encoding.bpsec.TargetResultList'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.bpsec.AbstractSecurityBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborSequence

Block data from Section 3.6 of RFC 9172.

class Flag(*values)

Bases: IntFlag

Security flags. Flags must be in LSbit-first order.

NONE = 0
PARAMETERS_PRESENT = 1
fields_desc: List[AnyField] = [<ArrayWrapField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).targets>, <UintField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).context_id>, <FlagsField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).context_flags>, <EidField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).source>, <scapy.fields.ConditionalField object>, <ArrayWrapField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).results>]
aliastypes = [<class 'bp.encoding.bpsec.AbstractSecurityBlock'>, <class 'scapy_cbor.packets.CborSequence'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
class bp.encoding.bpsec.BlockIntegrityBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: AbstractSecurityBlock

Block data from RFC 9172

aliastypes = [<class 'bp.encoding.bpsec.BlockIntegrityBlock'>, <class 'bp.encoding.bpsec.AbstractSecurityBlock'>, <class 'scapy_cbor.packets.CborSequence'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
fields_desc: List[AnyField] = [<ArrayWrapField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).targets>, <UintField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).context_id>, <FlagsField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).context_flags>, <EidField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).source>, <scapy.fields.ConditionalField object>, <ArrayWrapField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).results>]
class bp.encoding.bpsec.BlockConfidentialityBlock(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: AbstractSecurityBlock

Block data from RFC 9172

aliastypes = [<class 'bp.encoding.bpsec.BlockConfidentialityBlock'>, <class 'bp.encoding.bpsec.AbstractSecurityBlock'>, <class 'scapy_cbor.packets.CborSequence'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]
fields_desc: List[AnyField] = [<ArrayWrapField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).targets>, <UintField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).context_id>, <FlagsField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).context_flags>, <EidField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).source>, <scapy.fields.ConditionalField object>, <ArrayWrapField (AbstractSecurityBlock,BlockIntegrityBlock,BlockConfidentialityBlock).results>]

Whole bundle encodings and helper functions.

class bp.encoding.bundle.Bundle(_pkt=b'', post_transform=None, _internal=0, _underlayer=None, _parent=None, stop_dissection_after=None, **fields)

Bases: CborArray

An entire decoded bundle contents.

Bundles with administrative records are handled specially in that the AdminRecord object will be made a (scapy) payload of the “payload block” which is block type code 1.

BLOCK_TYPE_PAYLOAD = 1
BLOCK_NUM_PAYLOAD = 1
fields_desc: List[AnyField] = [<PacketField (Bundle).primary>, <PacketListField (Bundle).blocks>]
self_build(field_pos_list=None)

Create the default layer regarding fields_desc dict

post_dissect(s)

DEV: is called right after the current layer has been dissected

fill_fields()

Fill all fields so that the bundle is the full size it needs to be for encoding encoding with build(). Derived classes should populate their block-type-specific-data also.

update_all_crc()

Update all CRC fields in this bundle which are not yet set.

check_all_crc()

Check for CRC failures.

Return type:

Set[int]

Returns:

The set of block numbers with failed CRC check.

aliastypes = [<class 'bp.encoding.bundle.Bundle'>, <class 'scapy_cbor.packets.CborArray'>, <class 'scapy_cbor.packets.AbstractCborStruct'>, <class 'scapy.packet.Packet'>]

BP-specific field types.

class bp.encoding.fields.EidField(name, default=None)

Bases: CborField

A structured representation of an Endpoint ID. Only specific URI schemes are encodable.

class TypeCode(*values)

Bases: IntEnum

EID scheme codes. Flags must be in LSbit-first order.

dtn = 1
ipn = 2
class WellKnownSsp(*values)

Bases: IntEnum

Integer-valued well-known SSP.

none = 0
i2m(pkt, x)

Encode this field to a CBOR item.

Parameters:
  • pkt – The packet (container) context.

  • x – The internal data value.

Returns:

The CBOR item.

Return type:

A value or IGNORE.

m2i(pkt, x)

Decode this field from a CBOR item.

Parameters:
  • pkt – The packet (container) context.

  • x – The CBOR item value.

Returns:

The internal data value.

Return type:

A value or IGNORE.

randval()

Return a volatile object whose value is both random and suitable for this field

class bp.encoding.fields.DtnTimeField(name, default=None, maxval=None)

Bases: UintField

A DTN time value. This value is automatically converted from a datetime.datetime object and text.

DTN_EPOCH = datetime.datetime(2000, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
static datetime_to_dtntime(val)
static dtntime_to_datetime(val)
i2h(pkt, x)

Convert internal value to human value

i2repr(pkt, x)

Convert internal value to a nice representation

h2i(pkt, x)

Convert human value to internal value

any2i(pkt, x)

Try to understand the most input values possible and make an internal value from them

randval()

Return a volatile object whose value is both random and suitable for this field