1use core::{cell::RefCell, ops::ControlFlow};
6
7pub use actions::{
8 ForwardedTLV, ForwardedTLVProvider, NoForwardedTLVs, PortAction, PortActionIterator,
9 TimestampContext,
10};
11pub use measurement::Measurement;
12use rand::Rng;
13use state::PortState;
14
15use self::sequence_id::SequenceIdGenerator;
16pub use crate::datastructures::messages::{
17 is_compatible as is_message_buffer_compatible, MAX_DATA_LEN,
18};
19#[cfg(doc)]
20use crate::PtpInstance;
21use crate::{
22 bmc::{
23 acceptable_master::AcceptableMasterList,
24 bmca::{BestAnnounceMessage, Bmca},
25 },
26 clock::Clock,
27 config::PortConfig,
28 datastructures::{
29 common::PortIdentity,
30 messages::{Message, MessageBody},
31 },
32 filters::{Filter, FilterEstimate},
33 observability::{self, port::PortDS},
34 ptp_instance::{PtpInstanceState, PtpInstanceStateMutex},
35 time::{Duration, Time},
36};
37
38macro_rules! actions {
40 [] => {
41 {
42 crate::port::PortActionIterator::from(::arrayvec::ArrayVec::new())
43 }
44 };
45 [$action:expr] => {
46 {
47 let mut list = ::arrayvec::ArrayVec::new();
48 list.push($action);
49 crate::port::PortActionIterator::from(list)
50 }
51 };
52 [$action1:expr, $action2:expr] => {
53 {
54 let mut list = ::arrayvec::ArrayVec::new();
55 list.push($action1);
56 list.push($action2);
57 crate::port::PortActionIterator::from(list)
58 }
59 };
60}
61
62mod actions;
63mod bmca;
64mod master;
65mod measurement;
66mod sequence_id;
67mod slave;
68pub(crate) mod state;
69
70#[derive(Debug)]
275pub struct Port<'a, L, A, R, C, F: Filter, S = RefCell<PtpInstanceState>> {
276 config: PortConfig<()>,
277 filter_config: F::Config,
278 clock: C,
279 pub(crate) port_identity: PortIdentity,
281 port_state: PortState,
283 instance_state: &'a S,
284 bmca: Bmca<A>,
285 packet_buffer: [u8; MAX_DATA_LEN],
286 lifecycle: L,
287 rng: R,
288 multiport_disable: Option<Duration>,
293
294 announce_seq_ids: SequenceIdGenerator,
295 sync_seq_ids: SequenceIdGenerator,
296 delay_seq_ids: SequenceIdGenerator,
297 pdelay_seq_ids: SequenceIdGenerator,
298
299 filter: F,
300 mean_delay: Option<Duration>,
303 peer_delay_state: PeerDelayState,
304}
305
306#[derive(Clone, Copy, Debug, PartialEq, Eq)]
307enum PeerDelayState {
308 Empty,
309 Measuring {
310 id: u16,
311 responder_identity: Option<PortIdentity>,
312 request_send_time: Option<Time>,
313 request_recv_time: Option<Time>,
314 response_send_time: Option<Time>,
315 response_recv_time: Option<Time>,
316 },
317 PostMeasurement {
318 id: u16,
319 responder_identity: PortIdentity,
320 },
321}
322
323#[derive(Debug)]
325pub struct Running;
326
327#[derive(Debug)]
329pub struct InBmca {
330 pending_action: PortActionIterator<'static>,
331 local_best: Option<BestAnnounceMessage>,
332}
333
334impl<'a, A: AcceptableMasterList, C: Clock, F: Filter, R: Rng, S: PtpInstanceStateMutex>
335 Port<'a, Running, A, R, C, F, S>
336{
337 pub fn handle_send_timestamp(
342 &mut self,
343 context: TimestampContext,
344 timestamp: Time,
345 ) -> PortActionIterator<'_> {
346 match context.inner {
347 actions::TimestampContextInner::Sync { id } => {
348 self.handle_sync_timestamp(id, timestamp)
349 }
350 actions::TimestampContextInner::DelayReq { id } => {
351 self.handle_delay_timestamp(id, timestamp)
352 }
353 actions::TimestampContextInner::PDelayReq { id } => {
354 self.handle_pdelay_timestamp(id, timestamp)
355 }
356 actions::TimestampContextInner::PDelayResp {
357 id,
358 requestor_identity,
359 } => self.handle_pdelay_response_timestamp(id, requestor_identity, timestamp),
360 }
361 }
362
363 pub fn handle_announce_timer(
365 &mut self,
366 tlv_provider: &mut impl ForwardedTLVProvider,
367 ) -> PortActionIterator<'_> {
368 self.send_announce(tlv_provider)
369 }
370
371 pub fn handle_sync_timer(&mut self) -> PortActionIterator<'_> {
373 self.send_sync()
374 }
375
376 pub fn handle_delay_request_timer(&mut self) -> PortActionIterator<'_> {
378 self.send_delay_request()
379 }
380
381 pub fn handle_announce_receipt_timer(&mut self) -> PortActionIterator<'_> {
383 if self
384 .instance_state
385 .with_ref(|state| state.default_ds.slave_only)
386 {
387 if !matches!(self.port_state, PortState::Listening) {
390 self.set_forced_port_state(PortState::Listening);
391 }
392
393 let duration = self.config.announce_duration(&mut self.rng);
395 actions![PortAction::ResetAnnounceReceiptTimer { duration }]
396 } else {
397 match self.port_state {
400 PortState::Master => (),
401 _ => self.set_forced_port_state(PortState::Master),
402 }
403
404 actions![
406 PortAction::ResetAnnounceTimer {
407 duration: core::time::Duration::from_secs(0)
408 },
409 PortAction::ResetSyncTimer {
410 duration: core::time::Duration::from_secs(0)
411 }
412 ]
413 }
414 }
415
416 pub fn handle_filter_update_timer(&mut self) -> PortActionIterator {
418 let update = self.filter.update(&mut self.clock);
419 if update.mean_delay.is_some() {
420 self.mean_delay = update.mean_delay;
421 }
422 PortActionIterator::from_filter(update)
423 }
424
425 pub fn start_bmca(self) -> Port<'a, InBmca, A, R, C, F, S> {
428 Port {
429 port_state: self.port_state,
430 instance_state: self.instance_state,
431 config: self.config,
432 filter_config: self.filter_config,
433 clock: self.clock,
434 port_identity: self.port_identity,
435 bmca: self.bmca,
436 rng: self.rng,
437 multiport_disable: self.multiport_disable,
438 packet_buffer: [0; MAX_DATA_LEN],
439 lifecycle: InBmca {
440 pending_action: actions![],
441 local_best: None,
442 },
443 announce_seq_ids: self.announce_seq_ids,
444 sync_seq_ids: self.sync_seq_ids,
445 delay_seq_ids: self.delay_seq_ids,
446 pdelay_seq_ids: self.pdelay_seq_ids,
447
448 filter: self.filter,
449 mean_delay: self.mean_delay,
450 peer_delay_state: self.peer_delay_state,
451 }
452 }
453
454 fn parse_and_filter<'b>(
456 &mut self,
457 data: &'b [u8],
458 ) -> ControlFlow<PortActionIterator<'b>, Message<'b>> {
459 if !is_message_buffer_compatible(data) {
460 return ControlFlow::Break(actions![]);
462 }
463 let message = match Message::deserialize(data) {
464 Ok(message) => message,
465 Err(error) => {
466 log::warn!("Could not parse packet: {:?}", error);
467 return ControlFlow::Break(actions![]);
468 }
469 };
470 let domain_matches = self.instance_state.with_ref(|state| {
471 message.header().sdo_id == state.default_ds.sdo_id
472 && message.header().domain_number == state.default_ds.domain_number
473 });
474 if !domain_matches {
475 return ControlFlow::Break(actions![]);
476 }
477 ControlFlow::Continue(message)
478 }
479
480 pub fn handle_event_receive<'b>(
482 &'b mut self,
483 data: &'b [u8],
484 timestamp: Time,
485 ) -> PortActionIterator<'b> {
486 let message = match self.parse_and_filter(data) {
487 ControlFlow::Continue(value) => value,
488 ControlFlow::Break(value) => return value,
489 };
490
491 match message.body {
492 MessageBody::Sync(sync) => self.handle_sync(message.header, sync, timestamp),
493 MessageBody::DelayReq(delay_request) => {
494 self.handle_delay_req(message.header, delay_request, timestamp)
495 }
496 MessageBody::PDelayReq(_) => self.handle_pdelay_req(message.header, timestamp),
497 MessageBody::PDelayResp(peer_delay_response) => {
498 self.handle_peer_delay_response(message.header, peer_delay_response, timestamp)
499 }
500 _ => self.handle_general_internal(message),
501 }
502 }
503
504 pub fn handle_general_receive<'b>(&'b mut self, data: &'b [u8]) -> PortActionIterator<'b> {
506 let message = match self.parse_and_filter(data) {
507 ControlFlow::Continue(value) => value,
508 ControlFlow::Break(value) => return value,
509 };
510
511 self.handle_general_internal(message)
512 }
513
514 fn handle_general_internal<'b>(&'b mut self, message: Message<'b>) -> PortActionIterator<'b> {
515 match message.body {
516 MessageBody::Announce(announce) => self.handle_announce(&message, announce),
517 MessageBody::FollowUp(follow_up) => self.handle_follow_up(message.header, follow_up),
518 MessageBody::DelayResp(delay_response) => {
519 self.handle_delay_resp(message.header, delay_response)
520 }
521 MessageBody::PDelayRespFollowUp(peer_delay_follow_up) => {
522 self.handle_peer_delay_response_follow_up(message.header, peer_delay_follow_up)
523 }
524 MessageBody::Sync(_)
525 | MessageBody::DelayReq(_)
526 | MessageBody::PDelayReq(_)
527 | MessageBody::PDelayResp(_) => {
528 log::warn!("Received event message over general interface");
529 actions![]
530 }
531 MessageBody::Management(_) | MessageBody::Signaling(_) => actions![],
532 }
533 }
534}
535
536impl<'a, A, C, F: Filter, R, S> Port<'a, InBmca, A, R, C, F, S> {
537 pub fn end_bmca(
540 self,
541 ) -> (
542 Port<'a, Running, A, R, C, F, S>,
543 PortActionIterator<'static>,
544 ) {
545 (
546 Port {
547 port_state: self.port_state,
548 instance_state: self.instance_state,
549 config: self.config,
550 filter_config: self.filter_config,
551 clock: self.clock,
552 port_identity: self.port_identity,
553 bmca: self.bmca,
554 rng: self.rng,
555 multiport_disable: self.multiport_disable,
556 packet_buffer: [0; MAX_DATA_LEN],
557 lifecycle: Running,
558 announce_seq_ids: self.announce_seq_ids,
559 sync_seq_ids: self.sync_seq_ids,
560 delay_seq_ids: self.delay_seq_ids,
561 pdelay_seq_ids: self.pdelay_seq_ids,
562 filter: self.filter,
563 mean_delay: self.mean_delay,
564 peer_delay_state: self.peer_delay_state,
565 },
566 self.lifecycle.pending_action,
567 )
568 }
569}
570
571impl<L, A, R, C: Clock, F: Filter, S> Port<'_, L, A, R, C, F, S> {
572 fn set_forced_port_state(&mut self, mut state: PortState) {
573 log::info!(
574 "new state for port {}: {} -> {}",
575 self.port_identity.port_number,
576 self.port_state,
577 state
578 );
579 core::mem::swap(&mut self.port_state, &mut state);
580 if matches!(state, PortState::Slave(_) | PortState::Faulty)
581 || matches!(self.port_state, PortState::Faulty)
582 {
583 let mut filter = F::new(self.filter_config.clone());
584 core::mem::swap(&mut filter, &mut self.filter);
585 filter.demobilize(&mut self.clock);
586 }
587 }
588}
589
590impl<L, A, R, C, F: Filter, S> Port<'_, L, A, R, C, F, S> {
591 pub fn is_steering(&self) -> bool {
593 matches!(self.port_state, PortState::Slave(_))
594 }
595
596 pub fn is_master(&self) -> bool {
598 matches!(self.port_state, PortState::Master)
599 }
600
601 pub(crate) fn state(&self) -> &PortState {
602 &self.port_state
603 }
604
605 pub(crate) fn number(&self) -> u16 {
606 self.port_identity.port_number
607 }
608
609 pub fn port_ds(&self) -> PortDS {
611 PortDS {
612 port_identity: self.port_identity,
613 port_state: match self.port_state {
614 PortState::Faulty => observability::port::PortState::Faulty,
615 PortState::Listening => observability::port::PortState::Listening,
616 PortState::Master => observability::port::PortState::Master,
617 PortState::Passive => observability::port::PortState::Passive,
618 PortState::Slave(_) => observability::port::PortState::Slave,
619 },
620 log_announce_interval: self.config.announce_interval.as_log_2(),
621 announce_receipt_timeout: self.config.announce_receipt_timeout,
622 log_sync_interval: self.config.sync_interval.as_log_2(),
623 delay_mechanism: match self.config.delay_mechanism {
624 crate::config::DelayMechanism::E2E { interval } => {
625 observability::port::DelayMechanism::E2E {
626 log_min_delay_req_interval: interval.as_log_2(),
627 }
628 }
629 crate::config::DelayMechanism::P2P { interval } => {
630 observability::port::DelayMechanism::P2P {
631 log_min_p_delay_req_interval: interval.as_log_2(),
632 mean_link_delay: self.mean_delay.map(|v| v.into()).unwrap_or_default(),
633 }
634 }
635 },
636 version_number: 2,
637 minor_version_number: self.config.minor_ptp_version as u8,
638 delay_asymmetry: self.config.delay_asymmetry.into(),
639 master_only: self.config.master_only,
640 }
641 }
642
643 pub fn port_current_ds_contribution(&self) -> Option<FilterEstimate> {
646 if matches!(self.port_state, PortState::Slave(_)) {
647 Some(self.filter.current_estimates())
648 } else {
649 None
650 }
651 }
652}
653
654impl<'a, A, C, F: Filter, R: Rng, S: PtpInstanceStateMutex> Port<'a, InBmca, A, R, C, F, S> {
655 pub(crate) fn new(
657 instance_state: &'a S,
658 config: PortConfig<A>,
659 filter_config: F::Config,
660 clock: C,
661 port_identity: PortIdentity,
662 mut rng: R,
663 ) -> Self {
664 let duration = config.announce_duration(&mut rng);
665 let bmca = Bmca::new(
666 config.acceptable_master_list,
667 config.announce_interval.as_duration().into(),
668 port_identity,
669 );
670
671 let filter = F::new(filter_config.clone());
672
673 Port {
674 config: PortConfig {
675 acceptable_master_list: (),
676 delay_mechanism: config.delay_mechanism,
677 announce_interval: config.announce_interval,
678 announce_receipt_timeout: config.announce_receipt_timeout,
679 sync_interval: config.sync_interval,
680 master_only: config.master_only,
681 delay_asymmetry: config.delay_asymmetry,
682 minor_ptp_version: config.minor_ptp_version,
683 },
684 filter_config,
685 clock,
686 port_identity,
687 port_state: PortState::Listening,
688 instance_state,
689 bmca,
690 rng,
691 multiport_disable: None,
692 packet_buffer: [0; MAX_DATA_LEN],
693 lifecycle: InBmca {
694 pending_action: actions![PortAction::ResetAnnounceReceiptTimer { duration }],
695 local_best: None,
696 },
697 announce_seq_ids: SequenceIdGenerator::new(),
698 sync_seq_ids: SequenceIdGenerator::new(),
699 delay_seq_ids: SequenceIdGenerator::new(),
700 pdelay_seq_ids: SequenceIdGenerator::new(),
701 filter,
702 mean_delay: None,
703 peer_delay_state: PeerDelayState::Empty,
704 }
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use core::cell::RefCell;
711
712 use super::*;
713 use crate::{
714 config::{
715 AcceptAnyMaster, DelayMechanism, InstanceConfig, PtpMinorVersion, TimePropertiesDS,
716 },
717 datastructures::datasets::{InternalDefaultDS, InternalParentDS, PathTraceDS},
718 filters::BasicFilter,
719 time::{Duration, Interval, Time},
720 Clock,
721 };
722
723 pub(super) struct TestClock;
725
726 impl Clock for TestClock {
727 type Error = ();
728
729 fn set_frequency(&mut self, _freq: f64) -> Result<Time, Self::Error> {
730 Ok(Time::default())
731 }
732
733 fn now(&self) -> Time {
734 panic!("Shouldn't be called");
735 }
736
737 fn set_properties(
738 &mut self,
739 _time_properties_ds: &TimePropertiesDS,
740 ) -> Result<(), Self::Error> {
741 Ok(())
742 }
743
744 fn step_clock(&mut self, _offset: Duration) -> Result<Time, Self::Error> {
745 Ok(Time::default())
746 }
747 }
748
749 pub(super) fn setup_test_port(
750 state: &RefCell<PtpInstanceState>,
751 ) -> Port<'_, Running, AcceptAnyMaster, rand::rngs::mock::StepRng, TestClock, BasicFilter> {
752 let port = Port::<_, _, _, _, BasicFilter>::new(
753 state,
754 PortConfig {
755 acceptable_master_list: AcceptAnyMaster,
756 delay_mechanism: DelayMechanism::E2E {
757 interval: Interval::from_log_2(1),
758 },
759 announce_interval: Interval::from_log_2(1),
760 announce_receipt_timeout: 3,
761 sync_interval: Interval::from_log_2(0),
762 master_only: false,
763 delay_asymmetry: Duration::ZERO,
764 minor_ptp_version: PtpMinorVersion::One,
765 },
766 0.25,
767 TestClock,
768 Default::default(),
769 rand::rngs::mock::StepRng::new(2, 1),
770 );
771
772 let (port, _) = port.end_bmca();
773 port
774 }
775
776 pub(super) fn setup_test_port_custom_identity(
777 state: &RefCell<PtpInstanceState>,
778 port_identity: PortIdentity,
779 ) -> Port<'_, Running, AcceptAnyMaster, rand::rngs::mock::StepRng, TestClock, BasicFilter> {
780 let port = Port::<_, _, _, _, BasicFilter>::new(
781 &state,
782 PortConfig {
783 acceptable_master_list: AcceptAnyMaster,
784 delay_mechanism: DelayMechanism::E2E {
785 interval: Interval::from_log_2(1),
786 },
787 announce_interval: Interval::from_log_2(1),
788 announce_receipt_timeout: 3,
789 sync_interval: Interval::from_log_2(0),
790 master_only: false,
791 delay_asymmetry: Duration::ZERO,
792 minor_ptp_version: PtpMinorVersion::One,
793 },
794 0.25,
795 TestClock,
796 port_identity,
797 rand::rngs::mock::StepRng::new(2, 1),
798 );
799
800 let (port, _) = port.end_bmca();
801 port
802 }
803
804 pub(super) fn setup_test_port_custom_filter<F: Filter>(
805 state: &RefCell<PtpInstanceState>,
806 filter_config: F::Config,
807 ) -> Port<'_, Running, AcceptAnyMaster, rand::rngs::mock::StepRng, TestClock, F> {
808 let port = Port::<_, _, _, _, F>::new(
809 state,
810 PortConfig {
811 acceptable_master_list: AcceptAnyMaster,
812 delay_mechanism: DelayMechanism::E2E {
813 interval: Interval::from_log_2(1),
814 },
815 announce_interval: Interval::from_log_2(1),
816 announce_receipt_timeout: 3,
817 sync_interval: Interval::from_log_2(0),
818 master_only: false,
819 delay_asymmetry: Duration::ZERO,
820 minor_ptp_version: PtpMinorVersion::One,
821 },
822 filter_config,
823 TestClock,
824 Default::default(),
825 rand::rngs::mock::StepRng::new(2, 1),
826 );
827
828 let (port, _) = port.end_bmca();
829 port
830 }
831
832 pub(super) fn setup_test_state() -> RefCell<PtpInstanceState> {
833 let default_ds = InternalDefaultDS::new(InstanceConfig {
834 clock_identity: Default::default(),
835 priority_1: 255,
836 priority_2: 255,
837 domain_number: 0,
838 slave_only: false,
839 sdo_id: Default::default(),
840 path_trace: false,
841 });
842
843 let parent_ds = InternalParentDS::new(default_ds);
844
845 let state = RefCell::new(PtpInstanceState {
846 default_ds,
847 current_ds: Default::default(),
848 parent_ds,
849 time_properties_ds: Default::default(),
850 path_trace_ds: PathTraceDS::new(false),
851 });
852 state
853 }
854}