statime/port/
master.rs

1use arrayvec::ArrayVec;
2
3use super::{state::PortState, ForwardedTLVProvider, Port, PortActionIterator, Running};
4use crate::{
5    datastructures::{
6        common::{PortIdentity, Tlv, TlvSetBuilder, TlvType},
7        messages::{DelayReqMessage, Header, Message, MAX_DATA_LEN},
8    },
9    filters::Filter,
10    port::{actions::TimestampContextInner, PortAction, TimestampContext},
11    ptp_instance::PtpInstanceStateMutex,
12    time::Time,
13};
14
15impl<A, C, F: Filter, R, S: PtpInstanceStateMutex> Port<'_, Running, A, R, C, F, S> {
16    pub(super) fn send_sync(&mut self) -> PortActionIterator<'_> {
17        if matches!(self.port_state, PortState::Master) {
18            log::trace!("sending sync message");
19
20            let seq_id = self.sync_seq_ids.generate();
21            let packet_length = match self
22                .instance_state
23                .with_ref(|state| {
24                    Message::sync(
25                        &state.default_ds,
26                        self.port_identity,
27                        seq_id,
28                        self.config.minor_ptp_version.into(),
29                    )
30                })
31                .serialize(&mut self.packet_buffer)
32            {
33                Ok(message) => message,
34                Err(error) => {
35                    log::error!("Statime bug: Could not serialize sync: {:?}", error);
36                    return actions![];
37                }
38            };
39
40            actions![
41                PortAction::ResetSyncTimer {
42                    duration: self.config.sync_interval.as_core_duration(),
43                },
44                PortAction::SendEvent {
45                    context: TimestampContext {
46                        inner: TimestampContextInner::Sync { id: seq_id },
47                    },
48                    data: &self.packet_buffer[..packet_length],
49                    link_local: false,
50                }
51            ]
52        } else {
53            actions![]
54        }
55    }
56
57    pub(super) fn handle_sync_timestamp(
58        &mut self,
59        id: u16,
60        timestamp: Time,
61    ) -> PortActionIterator<'_> {
62        if matches!(self.port_state, PortState::Master) {
63            let packet_length = match self
64                .instance_state
65                .with_ref(|state| {
66                    Message::follow_up(
67                        &state.default_ds,
68                        self.port_identity,
69                        id,
70                        timestamp,
71                        self.config.minor_ptp_version.into(),
72                    )
73                })
74                .serialize(&mut self.packet_buffer)
75            {
76                Ok(length) => length,
77                Err(error) => {
78                    log::error!(
79                        "Statime bug: Could not serialize sync follow up {:?}",
80                        error
81                    );
82                    return actions![];
83                }
84            };
85
86            actions![PortAction::SendGeneral {
87                data: &self.packet_buffer[..packet_length],
88                link_local: false,
89            }]
90        } else {
91            actions![]
92        }
93    }
94
95    pub(super) fn send_announce(
96        &mut self,
97        tlv_provider: &mut impl ForwardedTLVProvider,
98    ) -> PortActionIterator<'_> {
99        if matches!(self.port_state, PortState::Master) {
100            log::trace!("sending announce message");
101
102            let mut tlv_buffer = [0; MAX_DATA_LEN];
103            let mut tlv_builder = TlvSetBuilder::new(&mut tlv_buffer);
104
105            let mut message = self.instance_state.with_ref(|state| {
106                Message::announce(
107                    state,
108                    self.port_identity,
109                    self.announce_seq_ids.generate(),
110                    self.config.minor_ptp_version.into(),
111                )
112            });
113            let mut tlv_margin = MAX_DATA_LEN - message.wire_size();
114
115            let path_trace_enabled = self.instance_state.with_ref(|state| {
116                let default_ds = &state.default_ds;
117                let path_trace_ds = &state.path_trace_ds;
118                if path_trace_ds.enable {
119                    'path_trace: {
120                        let mut path = path_trace_ds.list.clone();
121                        if path.try_push(default_ds.clock_identity).is_err() {
122                            break 'path_trace;
123                        }
124
125                        let value: ArrayVec<u8, MAX_DATA_LEN> =
126                            path.into_iter().flat_map(|ci| ci.0).collect();
127                        let tlv = Tlv {
128                            tlv_type: TlvType::PathTrace,
129                            value: value.as_slice().into(),
130                        };
131
132                        let tlv_size = tlv.wire_size();
133                        if tlv_margin > tlv_size {
134                            tlv_margin -= tlv_size;
135                            // Will not fail as previous checks ensure sufficient space in buffer.
136                            tlv_builder.add(tlv).unwrap();
137                        }
138                    }
139                }
140
141                path_trace_ds.enable
142            });
143
144            while let Some(tlv) = tlv_provider.next_if_smaller(tlv_margin) {
145                assert!(tlv.size() < tlv_margin);
146                let parent_port_identity = self
147                    .instance_state
148                    .with_ref(|s| s.parent_ds.parent_port_identity);
149                if parent_port_identity != tlv.sender_identity {
150                    // Ignore, shouldn't be forwarded
151                    continue;
152                }
153
154                // Don't forward PATH_TRACE TLVs, we processed them and added our own
155                if path_trace_enabled && tlv.tlv.tlv_type == TlvType::PathTrace {
156                    continue;
157                }
158
159                tlv_margin -= tlv.size();
160                // Will not fail as previous checks ensure sufficient space in buffer.
161                tlv_builder.add(tlv.tlv).unwrap();
162            }
163
164            message.suffix = tlv_builder.build();
165
166            let packet_length = match message.serialize(&mut self.packet_buffer) {
167                Ok(length) => length,
168                Err(error) => {
169                    log::error!(
170                        "Statime bug: Could not serialize announce message {:?}",
171                        error
172                    );
173                    return actions![];
174                }
175            };
176
177            actions![
178                PortAction::ResetAnnounceTimer {
179                    duration: self.config.announce_interval.as_core_duration(),
180                },
181                PortAction::SendGeneral {
182                    data: &self.packet_buffer[..packet_length],
183                    link_local: false,
184                }
185            ]
186        } else {
187            actions![]
188        }
189    }
190
191    pub(super) fn handle_delay_req(
192        &mut self,
193        header: Header,
194        message: DelayReqMessage,
195        timestamp: Time,
196    ) -> PortActionIterator<'_> {
197        if matches!(self.port_state, PortState::Master) {
198            log::debug!("Received DelayReq");
199            let delay_resp_message = Message::delay_resp(
200                header,
201                message,
202                self.port_identity,
203                self.config.min_delay_req_interval(),
204                timestamp,
205            );
206
207            let packet_length = match delay_resp_message.serialize(&mut self.packet_buffer) {
208                Ok(length) => length,
209                Err(error) => {
210                    log::error!("Could not serialize delay response: {:?}", error);
211                    return actions![];
212                }
213            };
214
215            actions![PortAction::SendGeneral {
216                data: &self.packet_buffer[..packet_length],
217                link_local: false,
218            }]
219        } else {
220            actions![]
221        }
222    }
223
224    pub(super) fn handle_pdelay_req(
225        &mut self,
226        header: Header,
227        timestamp: Time,
228    ) -> PortActionIterator<'_> {
229        log::debug!("Received PDelayReq");
230        let pdelay_resp_message = self.instance_state.with_ref(|state| {
231            Message::pdelay_resp(
232                &state.default_ds,
233                self.port_identity,
234                header,
235                timestamp,
236                self.config.minor_ptp_version.into(),
237            )
238        });
239
240        let packet_length = match pdelay_resp_message.serialize(&mut self.packet_buffer) {
241            Ok(length) => length,
242            Err(error) => {
243                log::error!("Could not serialize pdelay response: {:?}", error);
244                return actions![];
245            }
246        };
247
248        actions![PortAction::SendEvent {
249            data: &self.packet_buffer[..packet_length],
250            context: TimestampContext {
251                inner: TimestampContextInner::PDelayResp {
252                    id: header.sequence_id,
253                    requestor_identity: header.source_port_identity
254                }
255            },
256            link_local: true,
257        }]
258    }
259
260    pub(super) fn handle_pdelay_response_timestamp(
261        &mut self,
262        id: u16,
263        requestor_identity: PortIdentity,
264        timestamp: Time,
265    ) -> PortActionIterator<'_> {
266        let pdelay_resp_follow_up_messgae = self.instance_state.with_ref(|state| {
267            Message::pdelay_resp_follow_up(
268                &state.default_ds,
269                self.port_identity,
270                requestor_identity,
271                id,
272                timestamp,
273                self.config.minor_ptp_version.into(),
274            )
275        });
276
277        let packet_length = match pdelay_resp_follow_up_messgae.serialize(&mut self.packet_buffer) {
278            Ok(length) => length,
279            Err(error) => {
280                log::error!("Could not serialize pdelay_response_followup: {:?}", error);
281                return actions![];
282            }
283        };
284
285        actions![PortAction::SendGeneral {
286            data: &self.packet_buffer[..packet_length],
287            link_local: true,
288        }]
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use fixed::types::{I48F16, U96F32};
295
296    use super::*;
297    use crate::{
298        config::DelayMechanism,
299        datastructures::{
300            common::{PortIdentity, TimeInterval},
301            datasets::PathTraceDS,
302            messages::{Header, MessageBody},
303        },
304        port::{
305            tests::{setup_test_port, setup_test_state},
306            NoForwardedTLVs,
307        },
308        time::Interval,
309    };
310
311    #[test]
312    fn test_delay_response() {
313        let state = setup_test_state();
314
315        let mut port = setup_test_port(&state);
316
317        port.set_forced_port_state(PortState::Master);
318
319        port.config.delay_mechanism = DelayMechanism::E2E {
320            interval: Interval::from_log_2(2),
321        };
322
323        let mut action = port.handle_delay_req(
324            Header {
325                sequence_id: 5123,
326                source_port_identity: PortIdentity {
327                    port_number: 83,
328                    ..Default::default()
329                },
330                correction_field: TimeInterval(I48F16::from_bits(400)),
331                ..Header::new(1)
332            },
333            DelayReqMessage {
334                origin_timestamp: Time::from_micros(0).into(),
335            },
336            Time::from_fixed_nanos(U96F32::from_bits((200000 << 32) + (500 << 16))),
337        );
338
339        let Some(PortAction::SendGeneral {
340            data,
341            link_local: false,
342        }) = action.next()
343        else {
344            panic!("Unexpected resulting action");
345        };
346        assert!(action.next().is_none());
347        drop(action);
348
349        let msg = Message::deserialize(data).unwrap();
350        let msg_header = msg.header;
351
352        let msg = match msg.body {
353            MessageBody::DelayResp(msg) => msg,
354            _ => panic!("Unexpected message type"),
355        };
356
357        assert_eq!(
358            msg.requesting_port_identity,
359            PortIdentity {
360                port_number: 83,
361                ..Default::default()
362            }
363        );
364        assert_eq!(msg_header.sequence_id, 5123);
365        assert_eq!(msg.receive_timestamp, Time::from_micros(200).into());
366        assert_eq!(msg_header.log_message_interval, 2);
367        assert_eq!(
368            msg_header.correction_field,
369            TimeInterval(I48F16::from_bits(900))
370        );
371
372        port.config.delay_mechanism = DelayMechanism::E2E {
373            interval: Interval::from_log_2(5),
374        };
375
376        let mut action = port.handle_delay_req(
377            Header {
378                sequence_id: 879,
379                source_port_identity: PortIdentity {
380                    port_number: 12,
381                    ..Default::default()
382                },
383                correction_field: TimeInterval(I48F16::from_bits(200)),
384                ..Header::new(1)
385            },
386            DelayReqMessage {
387                origin_timestamp: Time::from_micros(0).into(),
388            },
389            Time::from_fixed_nanos(U96F32::from_bits((220000 << 32) + (300 << 16))),
390        );
391
392        let Some(PortAction::SendGeneral {
393            data,
394            link_local: false,
395        }) = action.next()
396        else {
397            panic!("Unexpected resulting action");
398        };
399        assert!(action.next().is_none());
400
401        let msg = Message::deserialize(data).unwrap();
402        let msg_header = msg.header;
403
404        let msg = match msg.body {
405            MessageBody::DelayResp(msg) => msg,
406            _ => panic!("Unexpected message type"),
407        };
408
409        assert_eq!(
410            msg.requesting_port_identity,
411            PortIdentity {
412                port_number: 12,
413                ..Default::default()
414            }
415        );
416        assert_eq!(msg_header.sequence_id, 879);
417        assert_eq!(msg.receive_timestamp, Time::from_micros(220).into());
418        assert_eq!(msg_header.log_message_interval, 5);
419        assert_eq!(
420            msg_header.correction_field,
421            TimeInterval(I48F16::from_bits(500))
422        );
423    }
424
425    #[test]
426    fn test_announce() {
427        let state = setup_test_state();
428
429        let mut state_ref = state.borrow_mut();
430        state_ref.default_ds.priority_1 = 15;
431        state_ref.default_ds.priority_2 = 128;
432        state_ref.parent_ds.grandmaster_priority_1 = 15;
433        state_ref.parent_ds.grandmaster_priority_2 = 128;
434
435        drop(state_ref);
436
437        let mut port = setup_test_port(&state);
438
439        port.set_forced_port_state(PortState::Master);
440
441        let mut actions = port.send_announce(&mut NoForwardedTLVs);
442
443        assert!(matches!(
444            actions.next(),
445            Some(PortAction::ResetAnnounceTimer { .. })
446        ));
447        let Some(PortAction::SendGeneral {
448            data,
449            link_local: false,
450        }) = actions.next()
451        else {
452            panic!("Unexpected action");
453        };
454        assert!(actions.next().is_none());
455        drop(actions);
456
457        let msg = Message::deserialize(data).unwrap();
458        let msg_header = msg.header;
459
460        let msg_body = match msg.body {
461            MessageBody::Announce(msg) => msg,
462            _ => panic!("Unexpected message type"),
463        };
464
465        assert_eq!(msg_body.grandmaster_priority_1, 15);
466        assert_eq!(msg.suffix, Default::default());
467
468        let mut actions = port.send_announce(&mut NoForwardedTLVs);
469
470        assert!(matches!(
471            actions.next(),
472            Some(PortAction::ResetAnnounceTimer { .. })
473        ));
474        let Some(PortAction::SendGeneral {
475            data,
476            link_local: false,
477        }) = actions.next()
478        else {
479            panic!("Unexpected action");
480        };
481        assert!(actions.next().is_none());
482
483        let msg2 = Message::deserialize(data).unwrap();
484        let msg2_header = msg2.header;
485
486        let msg2_body = match msg2.body {
487            MessageBody::Announce(msg) => msg,
488            _ => panic!("Unexpected message type"),
489        };
490
491        assert_eq!(msg2_body.grandmaster_priority_1, 15);
492        assert_eq!(msg2.suffix, Default::default());
493        assert_ne!(msg2_header.sequence_id, msg_header.sequence_id);
494    }
495
496    #[test]
497    fn test_announce_path_trace() {
498        let state = setup_test_state();
499
500        let mut state_ref = state.borrow_mut();
501        state_ref.default_ds.priority_1 = 15;
502        state_ref.default_ds.priority_2 = 128;
503        state_ref.parent_ds.grandmaster_priority_1 = 15;
504        state_ref.parent_ds.grandmaster_priority_2 = 128;
505        state_ref.path_trace_ds = PathTraceDS::new(true);
506
507        drop(state_ref);
508
509        let mut port = setup_test_port(&state);
510
511        port.set_forced_port_state(PortState::Master);
512
513        let mut actions = port.send_announce(&mut NoForwardedTLVs);
514
515        assert!(matches!(
516            actions.next(),
517            Some(PortAction::ResetAnnounceTimer { .. })
518        ));
519        let Some(PortAction::SendGeneral {
520            data,
521            link_local: false,
522        }) = actions.next()
523        else {
524            panic!("Unexpected action");
525        };
526        assert!(actions.next().is_none());
527        drop(actions);
528
529        let msg = Message::deserialize(data).unwrap();
530
531        let msg_body = match msg.body {
532            MessageBody::Announce(msg) => msg,
533            _ => panic!("Unexpected message type"),
534        };
535
536        assert_eq!(msg_body.grandmaster_priority_1, 15);
537
538        let mut tlvs = msg.suffix.tlv();
539        let Some(Tlv {
540            tlv_type: TlvType::PathTrace,
541            value,
542        }) = tlvs.next()
543        else {
544            panic!("Unexpected or missing TLV")
545        };
546        assert_eq!(value.as_ref(), [0; 8].as_ref());
547        assert!(tlvs.next().is_none());
548    }
549
550    #[test]
551    fn test_sync() {
552        let state = setup_test_state();
553
554        let mut state_ref = state.borrow_mut();
555        state_ref.default_ds.priority_1 = 15;
556        state_ref.default_ds.priority_2 = 128;
557        state_ref.parent_ds.grandmaster_priority_1 = 15;
558        state_ref.parent_ds.grandmaster_priority_2 = 128;
559
560        drop(state_ref);
561
562        let mut port = setup_test_port(&state);
563
564        port.set_forced_port_state(PortState::Master);
565        let mut actions = port.send_sync();
566
567        assert!(matches!(
568            actions.next(),
569            Some(PortAction::ResetSyncTimer { .. })
570        ));
571        let Some(PortAction::SendEvent {
572            context,
573            data,
574            link_local: false,
575        }) = actions.next()
576        else {
577            panic!("Unexpected action");
578        };
579        assert!(actions.next().is_none());
580        drop(actions);
581
582        let sync = Message::deserialize(data).unwrap();
583        let sync_header = sync.header;
584
585        let _sync = match sync.body {
586            MessageBody::Sync(msg) => msg,
587            _ => panic!("Unexpected message type"),
588        };
589
590        let id = match context.inner {
591            TimestampContextInner::Sync { id } => id,
592            _ => panic!("Wrong type of context"),
593        };
594
595        let mut actions = port.handle_sync_timestamp(
596            id,
597            Time::from_fixed_nanos(U96F32::from_bits((601300 << 32) + (230 << 16))),
598        );
599
600        let Some(PortAction::SendGeneral {
601            data,
602            link_local: false,
603        }) = actions.next()
604        else {
605            panic!("Unexpected action");
606        };
607        assert!(actions.next().is_none());
608        drop(actions);
609
610        let follow = Message::deserialize(data).unwrap();
611        let follow_header = follow.header;
612
613        let follow = match follow.body {
614            MessageBody::FollowUp(msg) => msg,
615            _ => panic!("Unexpected message type"),
616        };
617
618        assert_eq!(sync_header.sequence_id, follow_header.sequence_id);
619        assert_eq!(
620            sync_header.correction_field,
621            TimeInterval(I48F16::from_bits(0))
622        );
623        assert_eq!(
624            follow.precise_origin_timestamp,
625            Time::from_fixed_nanos(601300).into()
626        );
627        assert_eq!(
628            follow_header.correction_field,
629            TimeInterval(I48F16::from_bits(230))
630        );
631
632        let mut actions = port.send_sync();
633
634        assert!(matches!(
635            actions.next(),
636            Some(PortAction::ResetSyncTimer { .. })
637        ));
638        let Some(PortAction::SendEvent {
639            context,
640            data,
641            link_local: false,
642        }) = actions.next()
643        else {
644            panic!("Unexpected action");
645        };
646        assert!(actions.next().is_none());
647        drop(actions);
648
649        let sync2 = Message::deserialize(data).unwrap();
650        let sync2_header = sync2.header;
651
652        let _sync2 = match sync2.body {
653            MessageBody::Sync(msg) => msg,
654            _ => panic!("Unexpected message type"),
655        };
656
657        let id = match context.inner {
658            TimestampContextInner::Sync { id } => id,
659            _ => panic!("wrong type of context"),
660        };
661
662        let mut actions = port.handle_sync_timestamp(
663            id,
664            Time::from_fixed_nanos(U96F32::from_bits((1000601300 << 32) + (543 << 16))),
665        );
666
667        let Some(PortAction::SendGeneral {
668            data,
669            link_local: false,
670        }) = actions.next()
671        else {
672            panic!("Unexpected action");
673        };
674        assert!(actions.next().is_none());
675
676        let follow2 = Message::deserialize(data).unwrap();
677        let follow2_header = follow2.header;
678
679        let follow2 = match follow2.body {
680            MessageBody::FollowUp(msg) => msg,
681            _ => panic!("Unexpected message type"),
682        };
683
684        assert_ne!(sync_header.sequence_id, sync2_header.sequence_id);
685        assert_eq!(sync2_header.sequence_id, follow2_header.sequence_id);
686        assert_eq!(
687            sync2_header.correction_field,
688            TimeInterval(I48F16::from_bits(0))
689        );
690        assert_eq!(
691            follow2.precise_origin_timestamp,
692            Time::from_fixed_nanos(1000601300).into()
693        );
694        assert_eq!(
695            follow2_header.correction_field,
696            TimeInterval(I48F16::from_bits(543))
697        );
698    }
699
700    #[test]
701    fn test_peer_delay() {
702        let state = setup_test_state();
703
704        let mut port = setup_test_port(&state);
705
706        let mut actions = port.handle_pdelay_req(Header::new(1), Time::from_micros(500));
707
708        let Some(PortAction::SendEvent {
709            context,
710            data,
711            link_local: true,
712        }) = actions.next()
713        else {
714            panic!("Unexpected action");
715        };
716
717        let response = Message::deserialize(data).unwrap();
718        let MessageBody::PDelayResp(response_body) = response.body else {
719            panic!("Unexpected message sent by port");
720        };
721        assert_eq!(
722            response_body.request_receive_timestamp,
723            Time::from_micros(500).into()
724        );
725        assert!(actions.next().is_none());
726        drop(actions);
727
728        let mut actions = port.handle_send_timestamp(context, Time::from_micros(550));
729
730        let Some(PortAction::SendGeneral {
731            data,
732            link_local: true,
733        }) = actions.next()
734        else {
735            panic!("Unexpected action");
736        };
737
738        let response = Message::deserialize(data).unwrap();
739        let MessageBody::PDelayRespFollowUp(response_body) = response.body else {
740            panic!("Unexpected message sent by port");
741        };
742        assert_eq!(
743            response_body.response_origin_timestamp,
744            Time::from_micros(550).into()
745        );
746        assert!(actions.next().is_none());
747        drop(actions);
748    }
749}