Skip to content

uncertainty

cosmica.comm_link.uncertainty

Models for the uncertainty parameters in the communication link.

__all__ module-attribute

__all__ = [
    "ApertureAveragedLogNormalScintillationModel",
    "AtmosphericScintillationModel",
    "BinaryCloudModel",
    "CloudStates",
    "EdgeFailureModel",
    "ExpEdgeModel",
]

ApertureAveragedLogNormalScintillationModel dataclass

ApertureAveragedLogNormalScintillationModel(
    *,
    default_rytov_variance: float,
    wavelength: float,
    aperture_diameter: float
)

Bases: AtmosphericScintillationModel[bool_]

aperture_diameter instance-attribute

aperture_diameter: float

default_rytov_variance instance-attribute

default_rytov_variance: float

k_number cached property

k_number: float

wavelength instance-attribute

wavelength: float

sample

sample(
    rng: Generator,
    link_distance: float,
    rytov_variance: float | None = None,
) -> float
Source code in src/cosmica/comm_link/uncertainty.py
170
171
172
173
174
175
176
177
def sample(
    self,
    rng: np.random.Generator,
    link_distance: float,
    rytov_variance: float | None = None,
) -> float:
    sigma2_scintillation = self.sigma2_scintillation(link_distance, rytov_variance)
    return rng.lognormal(-sigma2_scintillation / 2, sigma2_scintillation)

scaled_aperture

scaled_aperture(link_distance: float) -> float
Source code in src/cosmica/comm_link/uncertainty.py
146
147
def scaled_aperture(self, link_distance: float) -> float:
    return np.sqrt((self.k_number * self.aperture_diameter**2) / (4 * link_distance))

sigma2_scintillation

sigma2_scintillation(
    link_distance: float,
    ryotv_variance: float | None = None,
) -> float
RETURNS DESCRIPTION
float

Float approximation for aperture averaged scintillation for a plane-wave in the absence of inner scale andouter scale effects as described by Larry C. Andrews and Ronald L. Phillips in the book "Laser BeamPropagation through Random Media" Chapter 10.

Source code in src/cosmica/comm_link/uncertainty.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def sigma2_scintillation(
    self,
    link_distance: float,
    ryotv_variance: float | None = None,
) -> Annotated[
    float,
    Doc(
        "Float approximation for aperture averaged scintillation for a plane-wave in the absence of inner scale and"
        'outer scale effects as described by Larry C. Andrews and Ronald L. Phillips in the book "Laser Beam'
        'Propagation through Random Media" Chapter 10.',
    ),
]:
    if ryotv_variance is not None and self.default_rytov_variance != ryotv_variance:
        ryotv_variance = self.default_rytov_variance
    scaled_aperture = self.scaled_aperture(link_distance=link_distance)
    num1 = 0.49 * self.default_rytov_variance
    den1 = (1 + 0.65 * scaled_aperture**2 + 1.11 * self.default_rytov_variance ** (6 / 5)) ** (7 / 6)
    num2 = 0.51 * self.default_rytov_variance * (1 + 0.69 * self.default_rytov_variance ** (6 / 5)) ** (-5 / 6)
    den2 = 1 + 0.90 * scaled_aperture**2 + 0.62 * scaled_aperture**2 * self.default_rytov_variance ** (6 / 5)
    return np.exp(num1 / den1 + num2 / den2) - 1

AtmosphericScintillationModel

Bases: ABC

sample abstractmethod

sample(
    rng: Generator,
    link_distance: float,
    rytov_variance: float | None = None,
) -> float
Source code in src/cosmica/comm_link/uncertainty.py
127
128
129
130
131
132
133
@abstractmethod
def sample(
    self,
    rng: np.random.Generator,
    link_distance: float,
    rytov_variance: float | None = None,
) -> float: ...

BinaryCloudModel dataclass

BinaryCloudModel(
    *,
    initial_cloud: bool = False,
    transition_p_0_to_1: float = 0.15,
    transition_p_1_to_0: float = 0.4
)

Bases: CloudStates[bool_]

Binary Cloud Model.

Generate the state of clouds described by a Markov Chain by calling the 'simulate' method.

initial_cloud class-attribute instance-attribute

initial_cloud: bool = False

Initial state. True indicates cloudy and False indicates free-of-clouds.

transition_p_0_to_1 class-attribute instance-attribute

transition_p_0_to_1: float = 0.15

transition_p_1_to_0 class-attribute instance-attribute

transition_p_1_to_0: float = 0.4

simulate

simulate(
    time: NDArray[datetime64], rng: Generator
) -> NDArray[bool_]
PARAMETER DESCRIPTION
time

time_array as a np.ndarray with elements in the np.datetime64 format

TYPE: NDArray[datetime64]

rng

NumPy random number generator.

TYPE: Generator

RETURNS DESCRIPTION
NDArray[bool_]

time series of cloud states on given time frame (1|True = cloudy, 0|False = Free-of-clouds)

Source code in src/cosmica/comm_link/uncertainty.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def simulate(
    self,
    time: Annotated[
        npt.NDArray[np.datetime64],
        Doc("time_array as a np.ndarray with elements in the np.datetime64 format"),
    ],
    rng: Annotated[np.random.Generator, Doc("NumPy random number generator.")],
) -> Annotated[
    npt.NDArray[np.bool_],
    Doc("time series of cloud states on given time frame (1|True = cloudy, 0|False = Free-of-clouds)"),
]:
    sampled = np.empty(time.shape[0], dtype=bool)
    current_state = self.initial_cloud

    for i in range(time.shape[0]):
        sampled[i] = current_state
        next_state = rng.choice(np.array([0, 1]), size=1, p=self._state_tm[int(current_state)])[0]
        current_state = next_state
    return sampled

CloudStates

Bases: ABC

simulate abstractmethod

simulate(
    time: NDArray[datetime64], rng: Generator
) -> NDArray[T]
Source code in src/cosmica/comm_link/uncertainty.py
22
23
@abstractmethod
def simulate(self, time: npt.NDArray[np.datetime64], rng: np.random.Generator) -> npt.NDArray[T]: ...

EdgeFailureModel

Bases: ABC

simulate abstractmethod

simulate(
    time: NDArray[datetime64], rng: Generator
) -> NDArray[T]
Source code in src/cosmica/comm_link/uncertainty.py
71
72
@abstractmethod
def simulate(self, time: npt.NDArray[np.datetime64], rng: np.random.Generator) -> npt.NDArray[T]: ...

ExpEdgeModel dataclass

ExpEdgeModel(
    *,
    reliability: timedelta64 = (
        lambda: timedelta64(15, "D")
    )(),
    recovery_time: timedelta64 = (
        lambda: timedelta64(1800, "s")
    )()
)

Bases: EdgeFailureModel[bool_]

Exponential Edge Model.

Generate the state of an edge following an exponential distribution by calling the 'simulate' method. An edge can be thought as the link between two nodes (e.g. terminals, satellites)

recovery_time class-attribute instance-attribute

recovery_time: timedelta64 = field(
    default_factory=lambda: timedelta64(1800, "s")
)

Recovery time in seconds (expected)

reliability class-attribute instance-attribute

reliability: timedelta64 = field(
    default_factory=lambda: timedelta64(15, "D")
)

time length in days which the edge is expected not to fail (Expected value of exponential distribution)

simulate

simulate(
    time: NDArray[datetime64], rng: Generator
) -> NDArray[bool_]
PARAMETER DESCRIPTION
time

time_array as a np.ndarray with elements in the np.datetime64 format

TYPE: NDArray[datetime64]

rng

NumPy random number generator

TYPE: Generator

RETURNS DESCRIPTION
NDArray[bool_]

time series of edge states on given time frame (True = Failure, False= No-failure)

Source code in src/cosmica/comm_link/uncertainty.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def simulate(
    self,
    time: Annotated[
        npt.NDArray[np.datetime64],
        Doc("time_array as a np.ndarray with elements in the np.datetime64 format"),
    ],
    rng: Annotated[np.random.Generator, Doc("NumPy random number generator")],
) -> Annotated[
    npt.NDArray[np.bool_],
    Doc("time series of edge states on given time frame (True = Failure, False= No-failure)"),
]:
    state_changed = np.zeros(time.shape, dtype=np.bool_)
    time_step = time[1] - time[0]
    total_time = time[-1] - time[0]

    failure_time: np.timedelta64 = rng.exponential(self.reliability / _SECOND) * _SECOND
    while failure_time <= total_time:
        failure_idx: int = np.where(time > (time[0] + failure_time))[0][0]
        state_changed[failure_idx] = True
        recovery_idx = failure_idx + int(self.recovery_time / time_step)
        if recovery_idx < state_changed.shape[0]:  # type: ignore[misc] # Possibly a typing bug in NumPy
            state_changed[recovery_idx] = True
        else:
            return np.logical_xor.accumulate(state_changed)
        failure_time += self.recovery_time + rng.exponential(self.reliability / _SECOND) * _SECOND
    return np.logical_xor.accumulate(state_changed)