statime/port/
master.rs

1use arrayvec::ArrayVec;
2
3use super::{state::PortState, ForwardedTLVProvider, Port, PortActionIterator, Running};
4use crate::{
5    datastructures::{
6        common::{PortIdentity, Tlv, TlvSetBuilder, TlvType},
7        messages::{DelayReqMessage, Header, Message, MAX_DATA_LEN},
8    },
9    filters::Filter,
10    port::{actions::TimestampContextInner, PortAction, TimestampContext},
11    ptp_instance::PtpInstanceStateMutex,
12    time::Time,
13};
14
15impl<A, C, F: Filter, R, S: PtpInstanceStateMutex> Port<'_, Running, A, R, C, F, S> {
16    pub(super) fn send_sync(&mut self) -> PortActionIterator {
17        if matches!(self.port_state, PortState::Master) {
18            log::trace!("sending sync message");
19
20            let seq_id = self.sync_seq_ids.generate();
21            let packet_length = match self
22                .instance_state
23                .with_ref(|state| {
24                    Message::sync(
25                        &state.default_ds,
26                        self.port_identity,
27                        seq_id,
28                        self.config.minor_ptp_version.into(),
29                    )
30                })
31                .serialize(&mut self.packet_buffer)
32            {
33                Ok(message) => message,
34                Err(error) => {
35                    log::error!("Statime bug: Could not serialize sync: {:?}", error);
36                    return actions![];
37                }
38            };
39
40            actions![
41                PortAction::ResetSyncTimer {
42                    duration: self.config.sync_interval.as_core_duration(),
43                },
44                PortAction::SendEvent {
45                    context: TimestampContext {
46                        inner: TimestampContextInner::Sync { id: seq_id },
47                    },
48                    data: &self.packet_buffer[..packet_length],
49                    link_local: false,
50                }
51            ]
52        } else {
53            actions![]
54        }
55    }
56
57    pub(super) fn handle_sync_timestamp(&mut self, id: u16, timestamp: Time) -> PortActionIterator {
58        if matches!(self.port_state, PortState::Master) {
59            let packet_length = match self
60                .instance_state
61                .with_ref(|state| {
62                    Message::follow_up(
63                        &state.default_ds,
64                        self.port_identity,
65                        id,
66                        timestamp,
67                        self.config.minor_ptp_version.into(),
68                    )
69                })
70                .serialize(&mut self.packet_buffer)
71            {
72                Ok(length) => length,
73                Err(error) => {
74                    log::error!(
75                        "Statime bug: Could not serialize sync follow up {:?}",
76                        error
77                    );
78                    return actions![];
79                }
80            };
81
82            actions![PortAction::SendGeneral {
83                data: &self.packet_buffer[..packet_length],
84                link_local: false,
85            }]
86        } else {
87            actions![]
88        }
89    }
90
91    pub(super) fn send_announce(
92        &mut self,
93        tlv_provider: &mut impl ForwardedTLVProvider,
94    ) -> PortActionIterator {
95        if matches!(self.port_state, PortState::Master) {
96            log::trace!("sending announce message");
97
98            let mut tlv_buffer = [0; MAX_DATA_LEN];
99            let mut tlv_builder = TlvSetBuilder::new(&mut tlv_buffer);
100
101            let mut message = self.instance_state.with_ref(|state| {
102                Message::announce(
103                    state,
104                    self.port_identity,
105                    self.announce_seq_ids.generate(),
106                    self.config.minor_ptp_version.into(),
107                )
108            });
109            let mut tlv_margin = MAX_DATA_LEN - message.wire_size();
110
111            let path_trace_enabled = self.instance_state.with_ref(|state| {
112                let default_ds = &state.default_ds;
113                let path_trace_ds = &state.path_trace_ds;
114                if path_trace_ds.enable {
115                    'path_trace: {
116                        let mut path = path_trace_ds.list.clone();
117                        if path.try_push(default_ds.clock_identity).is_err() {
118                            break 'path_trace;
119                        }
120
121                        let value: ArrayVec<u8, MAX_DATA_LEN> =
122                            path.into_iter().flat_map(|ci| ci.0).collect();
123                        let tlv = Tlv {
124                            tlv_type: TlvType::PathTrace,
125                            value: value.as_slice().into(),
126                        };
127
128                        let tlv_size = tlv.wire_size();
129                        if tlv_margin > tlv_size {
130                            tlv_margin -= tlv_size;
131                            // Will not fail as previous checks ensure sufficient space in buffer.
132                            tlv_builder.add(tlv).unwrap();
133                        }
134                    }
135                }
136
137                path_trace_ds.enable
138            });
139
140            while let Some(tlv) = tlv_provider.next_if_smaller(tlv_margin) {
141                assert!(tlv.size() < tlv_margin);
142                let parent_port_identity = self
143                    .instance_state
144                    .with_ref(|s| s.parent_ds.parent_port_identity);
145                if parent_port_identity != tlv.sender_identity {
146                    // Ignore, shouldn't be forwarded
147                    continue;
148                }
149
150                // Don't forward PATH_TRACE TLVs, we processed them and added our own
151                if path_trace_enabled && tlv.tlv.tlv_type == TlvType::PathTrace {
152                    continue;
153                }
154
155                tlv_margin -= tlv.size();
156                // Will not fail as previous checks ensure sufficient space in buffer.
157                tlv_builder.add(tlv.tlv).unwrap();
158            }
159
160            message.suffix = tlv_builder.build();
161
162            let packet_length = match message.serialize(&mut self.packet_buffer) {
163                Ok(length) => length,
164                Err(error) => {
165                    log::error!(
166                        "Statime bug: Could not serialize announce message {:?}",
167                        error
168                    );
169                    return actions![];
170                }
171            };
172
173            actions![
174                PortAction::ResetAnnounceTimer {
175                    duration: self.config.announce_interval.as_core_duration(),
176                },
177                PortAction::SendGeneral {
178                    data: &self.packet_buffer[..packet_length],
179                    link_local: false,
180                }
181            ]
182        } else {
183            actions![]
184        }
185    }
186
187    pub(super) fn handle_delay_req(
188        &mut self,
189        header: Header,
190        message: DelayReqMessage,
191        timestamp: Time,
192    ) -> PortActionIterator {
193        if matches!(self.port_state, PortState::Master) {
194            log::debug!("Received DelayReq");
195            let delay_resp_message = Message::delay_resp(
196                header,
197                message,
198                self.port_identity,
199                self.config.min_delay_req_interval(),
200                timestamp,
201            );
202
203            let packet_length = match delay_resp_message.serialize(&mut self.packet_buffer) {
204                Ok(length) => length,
205                Err(error) => {
206                    log::error!("Could not serialize delay response: {:?}", error);
207                    return actions![];
208                }
209            };
210
211            actions![PortAction::SendGeneral {
212                data: &self.packet_buffer[..packet_length],
213                link_local: false,
214            }]
215        } else {
216            actions![]
217        }
218    }
219
220    pub(super) fn handle_pdelay_req(
221        &mut self,
222        header: Header,
223        timestamp: Time,
224    ) -> PortActionIterator {
225        log::debug!("Received PDelayReq");
226        let pdelay_resp_message = self.instance_state.with_ref(|state| {
227            Message::pdelay_resp(
228                &state.default_ds,
229                self.port_identity,
230                header,
231                timestamp,
232                self.config.minor_ptp_version.into(),
233            )
234        });
235
236        let packet_length = match pdelay_resp_message.serialize(&mut self.packet_buffer) {
237            Ok(length) => length,
238            Err(error) => {
239                log::error!("Could not serialize pdelay response: {:?}", error);
240                return actions![];
241            }
242        };
243
244        actions![PortAction::SendEvent {
245            data: &self.packet_buffer[..packet_length],
246            context: TimestampContext {
247                inner: TimestampContextInner::PDelayResp {
248                    id: header.sequence_id,
249                    requestor_identity: header.source_port_identity
250                }
251            },
252            link_local: true,
253        }]
254    }
255
256    pub(super) fn handle_pdelay_response_timestamp(
257        &mut self,
258        id: u16,
259        requestor_identity: PortIdentity,
260        timestamp: Time,
261    ) -> PortActionIterator {
262        let pdelay_resp_follow_up_messgae = self.instance_state.with_ref(|state| {
263            Message::pdelay_resp_follow_up(
264                &state.default_ds,
265                self.port_identity,
266                requestor_identity,
267                id,
268                timestamp,
269                self.config.minor_ptp_version.into(),
270            )
271        });
272
273        let packet_length = match pdelay_resp_follow_up_messgae.serialize(&mut self.packet_buffer) {
274            Ok(length) => length,
275            Err(error) => {
276                log::error!("Could not serialize pdelay_response_followup: {:?}", error);
277                return actions![];
278            }
279        };
280
281        actions![PortAction::SendGeneral {
282            data: &self.packet_buffer[..packet_length],
283            link_local: true,
284        }]
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use fixed::types::{I48F16, U96F32};
291
292    use super::*;
293    use crate::{
294        config::DelayMechanism,
295        datastructures::{
296            common::{PortIdentity, TimeInterval},
297            datasets::PathTraceDS,
298            messages::{Header, MessageBody},
299        },
300        port::{
301            tests::{setup_test_port, setup_test_state},
302            NoForwardedTLVs,
303        },
304        time::Interval,
305    };
306
307    #[test]
308    fn test_delay_response() {
309        let state = setup_test_state();
310
311        let mut port = setup_test_port(&state);
312
313        port.set_forced_port_state(PortState::Master);
314
315        port.config.delay_mechanism = DelayMechanism::E2E {
316            interval: Interval::from_log_2(2),
317        };
318
319        let mut action = port.handle_delay_req(
320            Header {
321                sequence_id: 5123,
322                source_port_identity: PortIdentity {
323                    port_number: 83,
324                    ..Default::default()
325                },
326                correction_field: TimeInterval(I48F16::from_bits(400)),
327                ..Header::new(1)
328            },
329            DelayReqMessage {
330                origin_timestamp: Time::from_micros(0).into(),
331            },
332            Time::from_fixed_nanos(U96F32::from_bits((200000 << 32) + (500 << 16))),
333        );
334
335        let Some(PortAction::SendGeneral {
336            data,
337            link_local: false,
338        }) = action.next()
339        else {
340            panic!("Unexpected resulting action");
341        };
342        assert!(action.next().is_none());
343        drop(action);
344
345        let msg = Message::deserialize(data).unwrap();
346        let msg_header = msg.header;
347
348        let msg = match msg.body {
349            MessageBody::DelayResp(msg) => msg,
350            _ => panic!("Unexpected message type"),
351        };
352
353        assert_eq!(
354            msg.requesting_port_identity,
355            PortIdentity {
356                port_number: 83,
357                ..Default::default()
358            }
359        );
360        assert_eq!(msg_header.sequence_id, 5123);
361        assert_eq!(msg.receive_timestamp, Time::from_micros(200).into());
362        assert_eq!(msg_header.log_message_interval, 2);
363        assert_eq!(
364            msg_header.correction_field,
365            TimeInterval(I48F16::from_bits(900))
366        );
367
368        port.config.delay_mechanism = DelayMechanism::E2E {
369            interval: Interval::from_log_2(5),
370        };
371
372        let mut action = port.handle_delay_req(
373            Header {
374                sequence_id: 879,
375                source_port_identity: PortIdentity {
376                    port_number: 12,
377                    ..Default::default()
378                },
379                correction_field: TimeInterval(I48F16::from_bits(200)),
380                ..Header::new(1)
381            },
382            DelayReqMessage {
383                origin_timestamp: Time::from_micros(0).into(),
384            },
385            Time::from_fixed_nanos(U96F32::from_bits((220000 << 32) + (300 << 16))),
386        );
387
388        let Some(PortAction::SendGeneral {
389            data,
390            link_local: false,
391        }) = action.next()
392        else {
393            panic!("Unexpected resulting action");
394        };
395        assert!(action.next().is_none());
396
397        let msg = Message::deserialize(data).unwrap();
398        let msg_header = msg.header;
399
400        let msg = match msg.body {
401            MessageBody::DelayResp(msg) => msg,
402            _ => panic!("Unexpected message type"),
403        };
404
405        assert_eq!(
406            msg.requesting_port_identity,
407            PortIdentity {
408                port_number: 12,
409                ..Default::default()
410            }
411        );
412        assert_eq!(msg_header.sequence_id, 879);
413        assert_eq!(msg.receive_timestamp, Time::from_micros(220).into());
414        assert_eq!(msg_header.log_message_interval, 5);
415        assert_eq!(
416            msg_header.correction_field,
417            TimeInterval(I48F16::from_bits(500))
418        );
419    }
420
421    #[test]
422    fn test_announce() {
423        let state = setup_test_state();
424
425        let mut state_ref = state.borrow_mut();
426        state_ref.default_ds.priority_1 = 15;
427        state_ref.default_ds.priority_2 = 128;
428        state_ref.parent_ds.grandmaster_priority_1 = 15;
429        state_ref.parent_ds.grandmaster_priority_2 = 128;
430
431        drop(state_ref);
432
433        let mut port = setup_test_port(&state);
434
435        port.set_forced_port_state(PortState::Master);
436
437        let mut actions = port.send_announce(&mut NoForwardedTLVs);
438
439        assert!(matches!(
440            actions.next(),
441            Some(PortAction::ResetAnnounceTimer { .. })
442        ));
443        let Some(PortAction::SendGeneral {
444            data,
445            link_local: false,
446        }) = actions.next()
447        else {
448            panic!("Unexpected action");
449        };
450        assert!(actions.next().is_none());
451        drop(actions);
452
453        let msg = Message::deserialize(data).unwrap();
454        let msg_header = msg.header;
455
456        let msg_body = match msg.body {
457            MessageBody::Announce(msg) => msg,
458            _ => panic!("Unexpected message type"),
459        };
460
461        assert_eq!(msg_body.grandmaster_priority_1, 15);
462        assert_eq!(msg.suffix, Default::default());
463
464        let mut actions = port.send_announce(&mut NoForwardedTLVs);
465
466        assert!(matches!(
467            actions.next(),
468            Some(PortAction::ResetAnnounceTimer { .. })
469        ));
470        let Some(PortAction::SendGeneral {
471            data,
472            link_local: false,
473        }) = actions.next()
474        else {
475            panic!("Unexpected action");
476        };
477        assert!(actions.next().is_none());
478
479        let msg2 = Message::deserialize(data).unwrap();
480        let msg2_header = msg2.header;
481
482        let msg2_body = match msg2.body {
483            MessageBody::Announce(msg) => msg,
484            _ => panic!("Unexpected message type"),
485        };
486
487        assert_eq!(msg2_body.grandmaster_priority_1, 15);
488        assert_eq!(msg2.suffix, Default::default());
489        assert_ne!(msg2_header.sequence_id, msg_header.sequence_id);
490    }
491
492    #[test]
493    fn test_announce_path_trace() {
494        let state = setup_test_state();
495
496        let mut state_ref = state.borrow_mut();
497        state_ref.default_ds.priority_1 = 15;
498        state_ref.default_ds.priority_2 = 128;
499        state_ref.parent_ds.grandmaster_priority_1 = 15;
500        state_ref.parent_ds.grandmaster_priority_2 = 128;
501        state_ref.path_trace_ds = PathTraceDS::new(true);
502
503        drop(state_ref);
504
505        let mut port = setup_test_port(&state);
506
507        port.set_forced_port_state(PortState::Master);
508
509        let mut actions = port.send_announce(&mut NoForwardedTLVs);
510
511        assert!(matches!(
512            actions.next(),
513            Some(PortAction::ResetAnnounceTimer { .. })
514        ));
515        let Some(PortAction::SendGeneral {
516            data,
517            link_local: false,
518        }) = actions.next()
519        else {
520            panic!("Unexpected action");
521        };
522        assert!(actions.next().is_none());
523        drop(actions);
524
525        let msg = Message::deserialize(data).unwrap();
526
527        let msg_body = match msg.body {
528            MessageBody::Announce(msg) => msg,
529            _ => panic!("Unexpected message type"),
530        };
531
532        assert_eq!(msg_body.grandmaster_priority_1, 15);
533
534        let mut tlvs = msg.suffix.tlv();
535        let Some(Tlv {
536            tlv_type: TlvType::PathTrace,
537            value,
538        }) = tlvs.next()
539        else {
540            panic!("Unexpected or missing TLV")
541        };
542        assert_eq!(value.as_ref(), [0; 8].as_ref());
543        assert!(tlvs.next().is_none());
544    }
545
546    #[test]
547    fn test_sync() {
548        let state = setup_test_state();
549
550        let mut state_ref = state.borrow_mut();
551        state_ref.default_ds.priority_1 = 15;
552        state_ref.default_ds.priority_2 = 128;
553        state_ref.parent_ds.grandmaster_priority_1 = 15;
554        state_ref.parent_ds.grandmaster_priority_2 = 128;
555
556        drop(state_ref);
557
558        let mut port = setup_test_port(&state);
559
560        port.set_forced_port_state(PortState::Master);
561        let mut actions = port.send_sync();
562
563        assert!(matches!(
564            actions.next(),
565            Some(PortAction::ResetSyncTimer { .. })
566        ));
567        let Some(PortAction::SendEvent {
568            context,
569            data,
570            link_local: false,
571        }) = actions.next()
572        else {
573            panic!("Unexpected action");
574        };
575        assert!(actions.next().is_none());
576        drop(actions);
577
578        let sync = Message::deserialize(data).unwrap();
579        let sync_header = sync.header;
580
581        let _sync = match sync.body {
582            MessageBody::Sync(msg) => msg,
583            _ => panic!("Unexpected message type"),
584        };
585
586        let id = match context.inner {
587            TimestampContextInner::Sync { id } => id,
588            _ => panic!("Wrong type of context"),
589        };
590
591        let mut actions = port.handle_sync_timestamp(
592            id,
593            Time::from_fixed_nanos(U96F32::from_bits((601300 << 32) + (230 << 16))),
594        );
595
596        let Some(PortAction::SendGeneral {
597            data,
598            link_local: false,
599        }) = actions.next()
600        else {
601            panic!("Unexpected action");
602        };
603        assert!(actions.next().is_none());
604        drop(actions);
605
606        let follow = Message::deserialize(data).unwrap();
607        let follow_header = follow.header;
608
609        let follow = match follow.body {
610            MessageBody::FollowUp(msg) => msg,
611            _ => panic!("Unexpected message type"),
612        };
613
614        assert_eq!(sync_header.sequence_id, follow_header.sequence_id);
615        assert_eq!(
616            sync_header.correction_field,
617            TimeInterval(I48F16::from_bits(0))
618        );
619        assert_eq!(
620            follow.precise_origin_timestamp,
621            Time::from_fixed_nanos(601300).into()
622        );
623        assert_eq!(
624            follow_header.correction_field,
625            TimeInterval(I48F16::from_bits(230))
626        );
627
628        let mut actions = port.send_sync();
629
630        assert!(matches!(
631            actions.next(),
632            Some(PortAction::ResetSyncTimer { .. })
633        ));
634        let Some(PortAction::SendEvent {
635            context,
636            data,
637            link_local: false,
638        }) = actions.next()
639        else {
640            panic!("Unexpected action");
641        };
642        assert!(actions.next().is_none());
643        drop(actions);
644
645        let sync2 = Message::deserialize(data).unwrap();
646        let sync2_header = sync2.header;
647
648        let _sync2 = match sync2.body {
649            MessageBody::Sync(msg) => msg,
650            _ => panic!("Unexpected message type"),
651        };
652
653        let id = match context.inner {
654            TimestampContextInner::Sync { id } => id,
655            _ => panic!("wrong type of context"),
656        };
657
658        let mut actions = port.handle_sync_timestamp(
659            id,
660            Time::from_fixed_nanos(U96F32::from_bits((1000601300 << 32) + (543 << 16))),
661        );
662
663        let Some(PortAction::SendGeneral {
664            data,
665            link_local: false,
666        }) = actions.next()
667        else {
668            panic!("Unexpected action");
669        };
670        assert!(actions.next().is_none());
671
672        let follow2 = Message::deserialize(data).unwrap();
673        let follow2_header = follow2.header;
674
675        let follow2 = match follow2.body {
676            MessageBody::FollowUp(msg) => msg,
677            _ => panic!("Unexpected message type"),
678        };
679
680        assert_ne!(sync_header.sequence_id, sync2_header.sequence_id);
681        assert_eq!(sync2_header.sequence_id, follow2_header.sequence_id);
682        assert_eq!(
683            sync2_header.correction_field,
684            TimeInterval(I48F16::from_bits(0))
685        );
686        assert_eq!(
687            follow2.precise_origin_timestamp,
688            Time::from_fixed_nanos(1000601300).into()
689        );
690        assert_eq!(
691            follow2_header.correction_field,
692            TimeInterval(I48F16::from_bits(543))
693        );
694    }
695
696    #[test]
697    fn test_peer_delay() {
698        let state = setup_test_state();
699
700        let mut port = setup_test_port(&state);
701
702        let mut actions = port.handle_pdelay_req(Header::new(1), Time::from_micros(500));
703
704        let Some(PortAction::SendEvent {
705            context,
706            data,
707            link_local: true,
708        }) = actions.next()
709        else {
710            panic!("Unexpected action");
711        };
712
713        let response = Message::deserialize(data).unwrap();
714        let MessageBody::PDelayResp(response_body) = response.body else {
715            panic!("Unexpected message sent by port");
716        };
717        assert_eq!(
718            response_body.request_receive_timestamp,
719            Time::from_micros(500).into()
720        );
721        drop(response);
722        assert!(actions.next().is_none());
723        drop(actions);
724
725        let mut actions = port.handle_send_timestamp(context, Time::from_micros(550));
726
727        let Some(PortAction::SendGeneral {
728            data,
729            link_local: true,
730        }) = actions.next()
731        else {
732            panic!("Unexpected action");
733        };
734
735        let response = Message::deserialize(data).unwrap();
736        let MessageBody::PDelayRespFollowUp(response_body) = response.body else {
737            panic!("Unexpected message sent by port");
738        };
739        assert_eq!(
740            response_body.response_origin_timestamp,
741            Time::from_micros(550).into()
742        );
743        drop(response);
744        assert!(actions.next().is_none());
745        drop(actions);
746    }
747}