1use arrayvec::ArrayVec;
2
3use super::{state::PortState, ForwardedTLVProvider, Port, PortActionIterator, Running};
4use crate::{
5 datastructures::{
6 common::{PortIdentity, Tlv, TlvSetBuilder, TlvType},
7 messages::{DelayReqMessage, Header, Message, MAX_DATA_LEN},
8 },
9 filters::Filter,
10 port::{actions::TimestampContextInner, PortAction, TimestampContext},
11 ptp_instance::PtpInstanceStateMutex,
12 time::Time,
13};
14
15impl<A, C, F: Filter, R, S: PtpInstanceStateMutex> Port<'_, Running, A, R, C, F, S> {
16 pub(super) fn send_sync(&mut self) -> PortActionIterator<'_> {
17 if matches!(self.port_state, PortState::Master) {
18 log::trace!("sending sync message");
19
20 let seq_id = self.sync_seq_ids.generate();
21 let packet_length = match self
22 .instance_state
23 .with_ref(|state| {
24 Message::sync(
25 &state.default_ds,
26 self.port_identity,
27 seq_id,
28 self.config.minor_ptp_version.into(),
29 )
30 })
31 .serialize(&mut self.packet_buffer)
32 {
33 Ok(message) => message,
34 Err(error) => {
35 log::error!("Statime bug: Could not serialize sync: {:?}", error);
36 return actions![];
37 }
38 };
39
40 actions![
41 PortAction::ResetSyncTimer {
42 duration: self.config.sync_interval.as_core_duration(),
43 },
44 PortAction::SendEvent {
45 context: TimestampContext {
46 inner: TimestampContextInner::Sync { id: seq_id },
47 },
48 data: &self.packet_buffer[..packet_length],
49 link_local: false,
50 }
51 ]
52 } else {
53 actions![]
54 }
55 }
56
57 pub(super) fn handle_sync_timestamp(
58 &mut self,
59 id: u16,
60 timestamp: Time,
61 ) -> PortActionIterator<'_> {
62 if matches!(self.port_state, PortState::Master) {
63 let packet_length = match self
64 .instance_state
65 .with_ref(|state| {
66 Message::follow_up(
67 &state.default_ds,
68 self.port_identity,
69 id,
70 timestamp,
71 self.config.minor_ptp_version.into(),
72 )
73 })
74 .serialize(&mut self.packet_buffer)
75 {
76 Ok(length) => length,
77 Err(error) => {
78 log::error!(
79 "Statime bug: Could not serialize sync follow up {:?}",
80 error
81 );
82 return actions![];
83 }
84 };
85
86 actions![PortAction::SendGeneral {
87 data: &self.packet_buffer[..packet_length],
88 link_local: false,
89 }]
90 } else {
91 actions![]
92 }
93 }
94
95 pub(super) fn send_announce(
96 &mut self,
97 tlv_provider: &mut impl ForwardedTLVProvider,
98 ) -> PortActionIterator<'_> {
99 if matches!(self.port_state, PortState::Master) {
100 log::trace!("sending announce message");
101
102 let mut tlv_buffer = [0; MAX_DATA_LEN];
103 let mut tlv_builder = TlvSetBuilder::new(&mut tlv_buffer);
104
105 let mut message = self.instance_state.with_ref(|state| {
106 Message::announce(
107 state,
108 self.port_identity,
109 self.announce_seq_ids.generate(),
110 self.config.minor_ptp_version.into(),
111 )
112 });
113 let mut tlv_margin = MAX_DATA_LEN - message.wire_size();
114
115 let path_trace_enabled = self.instance_state.with_ref(|state| {
116 let default_ds = &state.default_ds;
117 let path_trace_ds = &state.path_trace_ds;
118 if path_trace_ds.enable {
119 'path_trace: {
120 let mut path = path_trace_ds.list.clone();
121 if path.try_push(default_ds.clock_identity).is_err() {
122 break 'path_trace;
123 }
124
125 let value: ArrayVec<u8, MAX_DATA_LEN> =
126 path.into_iter().flat_map(|ci| ci.0).collect();
127 let tlv = Tlv {
128 tlv_type: TlvType::PathTrace,
129 value: value.as_slice().into(),
130 };
131
132 let tlv_size = tlv.wire_size();
133 if tlv_margin > tlv_size {
134 tlv_margin -= tlv_size;
135 tlv_builder.add(tlv).unwrap();
137 }
138 }
139 }
140
141 path_trace_ds.enable
142 });
143
144 while let Some(tlv) = tlv_provider.next_if_smaller(tlv_margin) {
145 assert!(tlv.size() < tlv_margin);
146 let parent_port_identity = self
147 .instance_state
148 .with_ref(|s| s.parent_ds.parent_port_identity);
149 if parent_port_identity != tlv.sender_identity {
150 continue;
152 }
153
154 if path_trace_enabled && tlv.tlv.tlv_type == TlvType::PathTrace {
156 continue;
157 }
158
159 tlv_margin -= tlv.size();
160 tlv_builder.add(tlv.tlv).unwrap();
162 }
163
164 message.suffix = tlv_builder.build();
165
166 let packet_length = match message.serialize(&mut self.packet_buffer) {
167 Ok(length) => length,
168 Err(error) => {
169 log::error!(
170 "Statime bug: Could not serialize announce message {:?}",
171 error
172 );
173 return actions![];
174 }
175 };
176
177 actions![
178 PortAction::ResetAnnounceTimer {
179 duration: self.config.announce_interval.as_core_duration(),
180 },
181 PortAction::SendGeneral {
182 data: &self.packet_buffer[..packet_length],
183 link_local: false,
184 }
185 ]
186 } else {
187 actions![]
188 }
189 }
190
191 pub(super) fn handle_delay_req(
192 &mut self,
193 header: Header,
194 message: DelayReqMessage,
195 timestamp: Time,
196 ) -> PortActionIterator<'_> {
197 if matches!(self.port_state, PortState::Master) {
198 log::debug!("Received DelayReq");
199 let delay_resp_message = Message::delay_resp(
200 header,
201 message,
202 self.port_identity,
203 self.config.min_delay_req_interval(),
204 timestamp,
205 );
206
207 let packet_length = match delay_resp_message.serialize(&mut self.packet_buffer) {
208 Ok(length) => length,
209 Err(error) => {
210 log::error!("Could not serialize delay response: {:?}", error);
211 return actions![];
212 }
213 };
214
215 actions![PortAction::SendGeneral {
216 data: &self.packet_buffer[..packet_length],
217 link_local: false,
218 }]
219 } else {
220 actions![]
221 }
222 }
223
224 pub(super) fn handle_pdelay_req(
225 &mut self,
226 header: Header,
227 timestamp: Time,
228 ) -> PortActionIterator<'_> {
229 log::debug!("Received PDelayReq");
230 let pdelay_resp_message = self.instance_state.with_ref(|state| {
231 Message::pdelay_resp(
232 &state.default_ds,
233 self.port_identity,
234 header,
235 timestamp,
236 self.config.minor_ptp_version.into(),
237 )
238 });
239
240 let packet_length = match pdelay_resp_message.serialize(&mut self.packet_buffer) {
241 Ok(length) => length,
242 Err(error) => {
243 log::error!("Could not serialize pdelay response: {:?}", error);
244 return actions![];
245 }
246 };
247
248 actions![PortAction::SendEvent {
249 data: &self.packet_buffer[..packet_length],
250 context: TimestampContext {
251 inner: TimestampContextInner::PDelayResp {
252 id: header.sequence_id,
253 requestor_identity: header.source_port_identity
254 }
255 },
256 link_local: true,
257 }]
258 }
259
260 pub(super) fn handle_pdelay_response_timestamp(
261 &mut self,
262 id: u16,
263 requestor_identity: PortIdentity,
264 timestamp: Time,
265 ) -> PortActionIterator<'_> {
266 let pdelay_resp_follow_up_messgae = self.instance_state.with_ref(|state| {
267 Message::pdelay_resp_follow_up(
268 &state.default_ds,
269 self.port_identity,
270 requestor_identity,
271 id,
272 timestamp,
273 self.config.minor_ptp_version.into(),
274 )
275 });
276
277 let packet_length = match pdelay_resp_follow_up_messgae.serialize(&mut self.packet_buffer) {
278 Ok(length) => length,
279 Err(error) => {
280 log::error!("Could not serialize pdelay_response_followup: {:?}", error);
281 return actions![];
282 }
283 };
284
285 actions![PortAction::SendGeneral {
286 data: &self.packet_buffer[..packet_length],
287 link_local: true,
288 }]
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use fixed::types::{I48F16, U96F32};
295
296 use super::*;
297 use crate::{
298 config::DelayMechanism,
299 datastructures::{
300 common::{PortIdentity, TimeInterval},
301 datasets::PathTraceDS,
302 messages::{Header, MessageBody},
303 },
304 port::{
305 tests::{setup_test_port, setup_test_state},
306 NoForwardedTLVs,
307 },
308 time::Interval,
309 };
310
311 #[test]
312 fn test_delay_response() {
313 let state = setup_test_state();
314
315 let mut port = setup_test_port(&state);
316
317 port.set_forced_port_state(PortState::Master);
318
319 port.config.delay_mechanism = DelayMechanism::E2E {
320 interval: Interval::from_log_2(2),
321 };
322
323 let mut action = port.handle_delay_req(
324 Header {
325 sequence_id: 5123,
326 source_port_identity: PortIdentity {
327 port_number: 83,
328 ..Default::default()
329 },
330 correction_field: TimeInterval(I48F16::from_bits(400)),
331 ..Header::new(1)
332 },
333 DelayReqMessage {
334 origin_timestamp: Time::from_micros(0).into(),
335 },
336 Time::from_fixed_nanos(U96F32::from_bits((200000 << 32) + (500 << 16))),
337 );
338
339 let Some(PortAction::SendGeneral {
340 data,
341 link_local: false,
342 }) = action.next()
343 else {
344 panic!("Unexpected resulting action");
345 };
346 assert!(action.next().is_none());
347 drop(action);
348
349 let msg = Message::deserialize(data).unwrap();
350 let msg_header = msg.header;
351
352 let msg = match msg.body {
353 MessageBody::DelayResp(msg) => msg,
354 _ => panic!("Unexpected message type"),
355 };
356
357 assert_eq!(
358 msg.requesting_port_identity,
359 PortIdentity {
360 port_number: 83,
361 ..Default::default()
362 }
363 );
364 assert_eq!(msg_header.sequence_id, 5123);
365 assert_eq!(msg.receive_timestamp, Time::from_micros(200).into());
366 assert_eq!(msg_header.log_message_interval, 2);
367 assert_eq!(
368 msg_header.correction_field,
369 TimeInterval(I48F16::from_bits(900))
370 );
371
372 port.config.delay_mechanism = DelayMechanism::E2E {
373 interval: Interval::from_log_2(5),
374 };
375
376 let mut action = port.handle_delay_req(
377 Header {
378 sequence_id: 879,
379 source_port_identity: PortIdentity {
380 port_number: 12,
381 ..Default::default()
382 },
383 correction_field: TimeInterval(I48F16::from_bits(200)),
384 ..Header::new(1)
385 },
386 DelayReqMessage {
387 origin_timestamp: Time::from_micros(0).into(),
388 },
389 Time::from_fixed_nanos(U96F32::from_bits((220000 << 32) + (300 << 16))),
390 );
391
392 let Some(PortAction::SendGeneral {
393 data,
394 link_local: false,
395 }) = action.next()
396 else {
397 panic!("Unexpected resulting action");
398 };
399 assert!(action.next().is_none());
400
401 let msg = Message::deserialize(data).unwrap();
402 let msg_header = msg.header;
403
404 let msg = match msg.body {
405 MessageBody::DelayResp(msg) => msg,
406 _ => panic!("Unexpected message type"),
407 };
408
409 assert_eq!(
410 msg.requesting_port_identity,
411 PortIdentity {
412 port_number: 12,
413 ..Default::default()
414 }
415 );
416 assert_eq!(msg_header.sequence_id, 879);
417 assert_eq!(msg.receive_timestamp, Time::from_micros(220).into());
418 assert_eq!(msg_header.log_message_interval, 5);
419 assert_eq!(
420 msg_header.correction_field,
421 TimeInterval(I48F16::from_bits(500))
422 );
423 }
424
425 #[test]
426 fn test_announce() {
427 let state = setup_test_state();
428
429 let mut state_ref = state.borrow_mut();
430 state_ref.default_ds.priority_1 = 15;
431 state_ref.default_ds.priority_2 = 128;
432 state_ref.parent_ds.grandmaster_priority_1 = 15;
433 state_ref.parent_ds.grandmaster_priority_2 = 128;
434
435 drop(state_ref);
436
437 let mut port = setup_test_port(&state);
438
439 port.set_forced_port_state(PortState::Master);
440
441 let mut actions = port.send_announce(&mut NoForwardedTLVs);
442
443 assert!(matches!(
444 actions.next(),
445 Some(PortAction::ResetAnnounceTimer { .. })
446 ));
447 let Some(PortAction::SendGeneral {
448 data,
449 link_local: false,
450 }) = actions.next()
451 else {
452 panic!("Unexpected action");
453 };
454 assert!(actions.next().is_none());
455 drop(actions);
456
457 let msg = Message::deserialize(data).unwrap();
458 let msg_header = msg.header;
459
460 let msg_body = match msg.body {
461 MessageBody::Announce(msg) => msg,
462 _ => panic!("Unexpected message type"),
463 };
464
465 assert_eq!(msg_body.grandmaster_priority_1, 15);
466 assert_eq!(msg.suffix, Default::default());
467
468 let mut actions = port.send_announce(&mut NoForwardedTLVs);
469
470 assert!(matches!(
471 actions.next(),
472 Some(PortAction::ResetAnnounceTimer { .. })
473 ));
474 let Some(PortAction::SendGeneral {
475 data,
476 link_local: false,
477 }) = actions.next()
478 else {
479 panic!("Unexpected action");
480 };
481 assert!(actions.next().is_none());
482
483 let msg2 = Message::deserialize(data).unwrap();
484 let msg2_header = msg2.header;
485
486 let msg2_body = match msg2.body {
487 MessageBody::Announce(msg) => msg,
488 _ => panic!("Unexpected message type"),
489 };
490
491 assert_eq!(msg2_body.grandmaster_priority_1, 15);
492 assert_eq!(msg2.suffix, Default::default());
493 assert_ne!(msg2_header.sequence_id, msg_header.sequence_id);
494 }
495
496 #[test]
497 fn test_announce_path_trace() {
498 let state = setup_test_state();
499
500 let mut state_ref = state.borrow_mut();
501 state_ref.default_ds.priority_1 = 15;
502 state_ref.default_ds.priority_2 = 128;
503 state_ref.parent_ds.grandmaster_priority_1 = 15;
504 state_ref.parent_ds.grandmaster_priority_2 = 128;
505 state_ref.path_trace_ds = PathTraceDS::new(true);
506
507 drop(state_ref);
508
509 let mut port = setup_test_port(&state);
510
511 port.set_forced_port_state(PortState::Master);
512
513 let mut actions = port.send_announce(&mut NoForwardedTLVs);
514
515 assert!(matches!(
516 actions.next(),
517 Some(PortAction::ResetAnnounceTimer { .. })
518 ));
519 let Some(PortAction::SendGeneral {
520 data,
521 link_local: false,
522 }) = actions.next()
523 else {
524 panic!("Unexpected action");
525 };
526 assert!(actions.next().is_none());
527 drop(actions);
528
529 let msg = Message::deserialize(data).unwrap();
530
531 let msg_body = match msg.body {
532 MessageBody::Announce(msg) => msg,
533 _ => panic!("Unexpected message type"),
534 };
535
536 assert_eq!(msg_body.grandmaster_priority_1, 15);
537
538 let mut tlvs = msg.suffix.tlv();
539 let Some(Tlv {
540 tlv_type: TlvType::PathTrace,
541 value,
542 }) = tlvs.next()
543 else {
544 panic!("Unexpected or missing TLV")
545 };
546 assert_eq!(value.as_ref(), [0; 8].as_ref());
547 assert!(tlvs.next().is_none());
548 }
549
550 #[test]
551 fn test_sync() {
552 let state = setup_test_state();
553
554 let mut state_ref = state.borrow_mut();
555 state_ref.default_ds.priority_1 = 15;
556 state_ref.default_ds.priority_2 = 128;
557 state_ref.parent_ds.grandmaster_priority_1 = 15;
558 state_ref.parent_ds.grandmaster_priority_2 = 128;
559
560 drop(state_ref);
561
562 let mut port = setup_test_port(&state);
563
564 port.set_forced_port_state(PortState::Master);
565 let mut actions = port.send_sync();
566
567 assert!(matches!(
568 actions.next(),
569 Some(PortAction::ResetSyncTimer { .. })
570 ));
571 let Some(PortAction::SendEvent {
572 context,
573 data,
574 link_local: false,
575 }) = actions.next()
576 else {
577 panic!("Unexpected action");
578 };
579 assert!(actions.next().is_none());
580 drop(actions);
581
582 let sync = Message::deserialize(data).unwrap();
583 let sync_header = sync.header;
584
585 let _sync = match sync.body {
586 MessageBody::Sync(msg) => msg,
587 _ => panic!("Unexpected message type"),
588 };
589
590 let id = match context.inner {
591 TimestampContextInner::Sync { id } => id,
592 _ => panic!("Wrong type of context"),
593 };
594
595 let mut actions = port.handle_sync_timestamp(
596 id,
597 Time::from_fixed_nanos(U96F32::from_bits((601300 << 32) + (230 << 16))),
598 );
599
600 let Some(PortAction::SendGeneral {
601 data,
602 link_local: false,
603 }) = actions.next()
604 else {
605 panic!("Unexpected action");
606 };
607 assert!(actions.next().is_none());
608 drop(actions);
609
610 let follow = Message::deserialize(data).unwrap();
611 let follow_header = follow.header;
612
613 let follow = match follow.body {
614 MessageBody::FollowUp(msg) => msg,
615 _ => panic!("Unexpected message type"),
616 };
617
618 assert_eq!(sync_header.sequence_id, follow_header.sequence_id);
619 assert_eq!(
620 sync_header.correction_field,
621 TimeInterval(I48F16::from_bits(0))
622 );
623 assert_eq!(
624 follow.precise_origin_timestamp,
625 Time::from_fixed_nanos(601300).into()
626 );
627 assert_eq!(
628 follow_header.correction_field,
629 TimeInterval(I48F16::from_bits(230))
630 );
631
632 let mut actions = port.send_sync();
633
634 assert!(matches!(
635 actions.next(),
636 Some(PortAction::ResetSyncTimer { .. })
637 ));
638 let Some(PortAction::SendEvent {
639 context,
640 data,
641 link_local: false,
642 }) = actions.next()
643 else {
644 panic!("Unexpected action");
645 };
646 assert!(actions.next().is_none());
647 drop(actions);
648
649 let sync2 = Message::deserialize(data).unwrap();
650 let sync2_header = sync2.header;
651
652 let _sync2 = match sync2.body {
653 MessageBody::Sync(msg) => msg,
654 _ => panic!("Unexpected message type"),
655 };
656
657 let id = match context.inner {
658 TimestampContextInner::Sync { id } => id,
659 _ => panic!("wrong type of context"),
660 };
661
662 let mut actions = port.handle_sync_timestamp(
663 id,
664 Time::from_fixed_nanos(U96F32::from_bits((1000601300 << 32) + (543 << 16))),
665 );
666
667 let Some(PortAction::SendGeneral {
668 data,
669 link_local: false,
670 }) = actions.next()
671 else {
672 panic!("Unexpected action");
673 };
674 assert!(actions.next().is_none());
675
676 let follow2 = Message::deserialize(data).unwrap();
677 let follow2_header = follow2.header;
678
679 let follow2 = match follow2.body {
680 MessageBody::FollowUp(msg) => msg,
681 _ => panic!("Unexpected message type"),
682 };
683
684 assert_ne!(sync_header.sequence_id, sync2_header.sequence_id);
685 assert_eq!(sync2_header.sequence_id, follow2_header.sequence_id);
686 assert_eq!(
687 sync2_header.correction_field,
688 TimeInterval(I48F16::from_bits(0))
689 );
690 assert_eq!(
691 follow2.precise_origin_timestamp,
692 Time::from_fixed_nanos(1000601300).into()
693 );
694 assert_eq!(
695 follow2_header.correction_field,
696 TimeInterval(I48F16::from_bits(543))
697 );
698 }
699
700 #[test]
701 fn test_peer_delay() {
702 let state = setup_test_state();
703
704 let mut port = setup_test_port(&state);
705
706 let mut actions = port.handle_pdelay_req(Header::new(1), Time::from_micros(500));
707
708 let Some(PortAction::SendEvent {
709 context,
710 data,
711 link_local: true,
712 }) = actions.next()
713 else {
714 panic!("Unexpected action");
715 };
716
717 let response = Message::deserialize(data).unwrap();
718 let MessageBody::PDelayResp(response_body) = response.body else {
719 panic!("Unexpected message sent by port");
720 };
721 assert_eq!(
722 response_body.request_receive_timestamp,
723 Time::from_micros(500).into()
724 );
725 assert!(actions.next().is_none());
726 drop(actions);
727
728 let mut actions = port.handle_send_timestamp(context, Time::from_micros(550));
729
730 let Some(PortAction::SendGeneral {
731 data,
732 link_local: true,
733 }) = actions.next()
734 else {
735 panic!("Unexpected action");
736 };
737
738 let response = Message::deserialize(data).unwrap();
739 let MessageBody::PDelayRespFollowUp(response_body) = response.body else {
740 panic!("Unexpected message sent by port");
741 };
742 assert_eq!(
743 response_body.response_origin_timestamp,
744 Time::from_micros(550).into()
745 );
746 assert!(actions.next().is_none());
747 drop(actions);
748 }
749}