-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathoffer_method_tcp.py
124 lines (99 loc) · 4.37 KB
/
offer_method_tcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio
import ipaddress
import logging
import sys
from typing import Tuple
from someipy import TransportLayerProtocol, MethodResult, MessageType, ReturnCode
from someipy.service import ServiceBuilder, Method
from someipy.service_discovery import construct_service_discovery
from someipy.server_service_instance import construct_server_service_instance
from someipy.logging import set_someipy_log_level
from someipy.serialization import Sint32
from addition_method_parameters import Addends, Sum
SD_MULTICAST_GROUP = "224.224.224.245"
SD_PORT = 30490
DEFAULT_INTERFACE_IP = "127.0.0.1" # Default IP if not provided
SAMPLE_SERVICE_ID = 0x1234
SAMPLE_INSTANCE_ID = 0x5678
SAMPLE_METHOD_ID = 0x0123
async def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
print(
f"Received data: {' '.join(f'0x{b:02x}' for b in input_data)} from IP: {addr[0]} Port: {addr[1]}"
)
result = MethodResult()
try:
# Deserialize the input data
addends = Addends()
addends.deserialize(input_data)
except Exception as e:
print(f"Error during deserialization: {e}")
# Set the return code to E_MALFORMED_MESSAGE and return
result.message_type = MessageType.RESPONSE
result.return_code = ReturnCode.E_MALFORMED_MESSAGE
return result
# Perform the addition
sum = Sum()
sum.value = Sint32(addends.addend1.value + addends.addend2.value)
print(f"Send back: {' '.join(f'0x{b:02x}' for b in sum.serialize())}")
result.message_type = MessageType.RESPONSE
result.return_code = ReturnCode.E_OK
result.payload = sum.serialize()
return result
async def main():
# It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, ..
set_someipy_log_level(logging.DEBUG)
# Get interface ip to use from command line argument (--interface_ip) or use default
interface_ip = DEFAULT_INTERFACE_IP
for i, arg in enumerate(sys.argv):
if arg == "--interface_ip":
if i + 1 < len(sys.argv):
interface_ip = sys.argv[i + 1]
break
# Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function
# use the construct_service_discovery function
# The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set
service_discovery = await construct_service_discovery(
SD_MULTICAST_GROUP, SD_PORT, interface_ip
)
addition_method = Method(id=SAMPLE_METHOD_ID, method_handler=add_method_handler)
addition_service = (
ServiceBuilder()
.with_service_id(SAMPLE_SERVICE_ID)
.with_major_version(1)
.with_method(addition_method)
.build()
)
# For offering methods use a ServerServiceInstance
service_instance_addition = await construct_server_service_instance(
addition_service,
instance_id=SAMPLE_INSTANCE_ID,
endpoint=(
ipaddress.IPv4Address(interface_ip),
3000,
), # src IP and port of the service
ttl=5,
sd_sender=service_discovery,
cyclic_offer_delay_ms=2000,
protocol=TransportLayerProtocol.TCP,
)
# The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance
# is notified by the ServiceDiscoveryProtocol about e.g. subscriptions from other ECUs
service_discovery.attach(service_instance_addition)
# ..it's also possible to construct another ServerServiceInstance and attach it to service_discovery as well
# After constructing and attaching ServerServiceInstances to the ServiceDiscoveryProtocol object the
# start_offer method has to be called. This will start an internal timer, which will periodically send
# Offer service entries with a period of "cyclic_offer_delay_ms" which has been passed above
print("Start offering service..")
service_instance_addition.start_offer()
try:
# Keep the task alive
await asyncio.Future()
except asyncio.CancelledError:
print("Stop offering service..")
await service_instance_addition.stop_offer()
finally:
print("Service Discovery close..")
service_discovery.close()
print("End main task..")
if __name__ == "__main__":
asyncio.run(main())