Skip to content

experimental_packet_routing

cosmica.experimental_packet_routing

BackupCaseType module-attribute

BackupCaseType = Literal[
    "no-backup",
    "backup-feeder-links",
    "backup-n-hops-links",
    "backup-n-hops-links-and-feeder-links",
    "dual-backup-n-hops-links-and-feeder-links",
]

__all__ module-attribute

__all__ = [
    "BackupCaseType",
    "PacketCommunicationSimulator",
    "PacketRoutingResult",
    "PacketRoutingSetting",
    "RoutingResultVisualizer",
    "RoutingResultVisualizer",
    "SpaceTimeGraph",
]

PacketCommunicationSimulator

PacketCommunicationSimulator(
    time: NDArray[datetime64],
    all_graphs_with_comm_performance: list[Graph],
    nodes_dict: dict[NodeGID, Node],
    demands: list[Demand],
    *,
    backup_case: BackupCaseType = "no-backup",
    hop_limit: int = 1,
    packet_size: int = int(10000.0),
    space_time_graph: SpaceTimeGraph
)

時系列のパケットレベルの通信シミュレーションを実施するクラス.

PARAMETER DESCRIPTION
time

Array of datetime64 representing the time points

TYPE: NDArray[datetime64]

all_graphs_with_comm_performance

List of Graph objects representing communication performance over time

TYPE: list[Graph]

nodes_dict

Dictionary mapping NodeGID to Node objects

TYPE: dict[NodeGID, Node]

demands

List of demands

TYPE: list[Demand]

backup_case

Backup case scenario identifier, default is 'no-backup'

TYPE: BackupCaseType DEFAULT: 'no-backup'

hop_limit

Hop limit for backup routing, default is 1

TYPE: int DEFAULT: 1

packet_size

Size of the packet for communication [bit], default is 10,000

TYPE: int DEFAULT: int(10000.0)

space_time_graph

SpaceTimeGraph object

TYPE: SpaceTimeGraph

Source code in src/cosmica/experimental_packet_routing/simulator.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    time: Annotated[
        npt.NDArray[np.datetime64],
        Doc("Array of datetime64 representing the time points"),
    ],
    all_graphs_with_comm_performance: Annotated[
        list[Graph],
        Doc("List of Graph objects representing communication performance over time"),
    ],
    nodes_dict: Annotated[dict[NodeGID, Node], Doc("Dictionary mapping NodeGID to Node objects")],
    demands: Annotated[list[Demand], Doc("List of demands")],
    *,
    backup_case: Annotated[
        BackupCaseType,
        Doc("Backup case scenario identifier, default is 'no-backup'"),
    ] = "no-backup",
    hop_limit: Annotated[int, Doc("Hop limit for backup routing, default is 1")] = 1,
    packet_size: Annotated[int, Doc("Size of the packet for communication [bit], default is 10,000")] = int(1e4),
    space_time_graph: Annotated[
        SpaceTimeGraph,
        Doc("SpaceTimeGraph object"),
    ],
) -> None:
    assert len(time) == len(all_graphs_with_comm_performance)

    self.time: npt.NDArray[np.datetime64] = time
    self.all_graphs_with_comm_performance: list[Graph] = all_graphs_with_comm_performance
    self.nodes_dict: dict[NodeGID, Node] = nodes_dict
    self.demands: list[Demand] = demands

    self.backup_case: BackupCaseType = backup_case
    self.hop_limit: int = hop_limit
    self.packet_size: int = packet_size

    self.node_knowledge_known_by_each_node: dict[Node, NodeKnowledge] = {}
    self.space_time_graph = space_time_graph
    for node in tqdm(self.nodes_dict.values(), desc="Initializing node knowledge"):
        self.node_knowledge_known_by_each_node[node] = NodeKnowledge(
            target_node=node,
            space_time_graph=copy.deepcopy(self.space_time_graph),
            forwarding_table_time_list=initialize_forwarding_table_list_from_space_time_graph(
                space_time_graph=self.space_time_graph,
                src_node=node,
                weight="weight",
                backup_case=self.backup_case,
                hops_limit=self.hop_limit,
            ),
        )

all_graphs_with_comm_performance instance-attribute

all_graphs_with_comm_performance: list[Graph] = (
    all_graphs_with_comm_performance
)

backup_case instance-attribute

backup_case: BackupCaseType = backup_case

demands instance-attribute

demands: list[Demand] = demands

hop_limit instance-attribute

hop_limit: int = hop_limit

node_knowledge_known_by_each_node instance-attribute

node_knowledge_known_by_each_node: dict[
    Node, NodeKnowledge
] = {}

nodes_dict instance-attribute

nodes_dict: dict[NodeGID, Node] = nodes_dict

packet_size instance-attribute

packet_size: int = packet_size

space_time_graph instance-attribute

space_time_graph = space_time_graph

time instance-attribute

time: NDArray[datetime64] = time

create_packet_routing_setting

create_packet_routing_setting() -> PacketRoutingSetting

Generate and return a PacketRoutingSetting object.

Source code in src/cosmica/experimental_packet_routing/simulator.py
 95
 96
 97
 98
 99
100
101
102
103
104
def create_packet_routing_setting(self) -> PacketRoutingSetting:
    """Generate and return a PacketRoutingSetting object."""
    return PacketRoutingSetting(
        time=self.time,
        nodes_dict=self.nodes_dict,
        demands=self.demands,
        backup_case=self.backup_case,
        hop_limit=self.hop_limit,
        packet_size=self.packet_size,
    )

run

run(
    *,
    rng: Generator | None = None,
    edge_remove_schedule: (
        list[tuple[datetime64, tuple[Node, Node]]] | None
    ) = None,
    failure_detection_time: timedelta64 | None = None,
    enable_random_routing_when_edge_failure: bool = False,
    prevent_loop: bool = False,
    with_lsa: bool = True,
    lsa_case: LsaCaseType = "from-source-to-all"
) -> PacketRoutingResult
PARAMETER DESCRIPTION
rng

NumPy random number generator. If None, use default.

TYPE: Generator | None DEFAULT: None

edge_remove_schedule

List of edge removal schedule

TYPE: list[tuple[datetime64, tuple[Node, Node]]] | None DEFAULT: None

failure_detection_time

Time taken for failure detection

TYPE: timedelta64 | None DEFAULT: None

enable_random_routing_when_edge_failure

Flag to enable random routing when edge failure

TYPE: bool DEFAULT: False

prevent_loop

Flag to prevent loop in routing

TYPE: bool DEFAULT: False

with_lsa

Flag to enable LSA data

TYPE: bool DEFAULT: True

lsa_case

Case scenario identifier for LSA data, default is 'nominal'

TYPE: LsaCaseType DEFAULT: 'from-source-to-all'

Source code in src/cosmica/experimental_packet_routing/simulator.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
def run(  # noqa: C901, PLR0912, PLR0915
    self,
    *,
    rng: Annotated[
        np.random.Generator | None,
        Doc("NumPy random number generator. If None, use default."),
    ] = None,
    edge_remove_schedule: Annotated[
        list[tuple[np.datetime64, tuple[Node, Node]]] | None,
        Doc("List of edge removal schedule"),
    ] = None,
    failure_detection_time: Annotated[np.timedelta64 | None, Doc("Time taken for failure detection")] = None,
    enable_random_routing_when_edge_failure: Annotated[
        bool,
        Doc("Flag to enable random routing when edge failure"),
    ] = False,
    prevent_loop: Annotated[bool, Doc("Flag to prevent loop in routing")] = False,
    with_lsa: Annotated[bool, Doc("Flag to enable LSA data")] = True,
    lsa_case: Annotated[
        LsaCaseType,
        Doc("Case scenario identifier for LSA data, default is 'nominal'"),
    ] = "from-source-to-all",
) -> PacketRoutingResult:
    ## デフォルト値の設定 ========================================
    rng = rng if rng is not None else np.random.default_rng()
    edge_remove_schedule = edge_remove_schedule if edge_remove_schedule is not None else []
    failure_detection_time = (
        failure_detection_time if failure_detection_time is not None else np.timedelta64(10, "ms")
    )

    ## シミュレーション情報をログ出力 ========================================
    logger.info(f"Starting packet communication simulation with {len(self.time)} time steps")
    logger.info(f"Number of nodes: {len(self.nodes_dict)}")
    logger.info(f"Number of demands: {len(self.demands)}")
    if edge_remove_schedule:
        logger.info(f"Edge removal schedule: {len(edge_remove_schedule)} events")
        for timing, edge in edge_remove_schedule:
            logger.info(f"  - {edge[0].id} <-> {edge[1].id} at {timing}")

    ## データ格納用のリスト ========================================
    all_graphs_after_simulation: list[Graph] = []  # シミュレーション後のグラフ履歴
    comm_data_demand_list: list[CommDataDemand] = []
    comm_data_lsa_list: list[CommDataLSA] = []

    ## シミュレーションの実行 ========================================
    for time_idx, current_time in tqdm(
        enumerate(self.time),
        desc="Running packet simulation",
        total=len(self.time),
    ):
        if time_idx < len(self.time) - 1:
            time_step: np.timedelta64 = self.time[time_idx + 1] - self.time[time_idx]
            time_step_s: float = float(time_step / np.timedelta64(1, "s"))

        ## タイムステップにおけるグラフの生成 ========================================
        _graph: Graph = copy.deepcopy(self.all_graphs_with_comm_performance[time_idx])

        # Initialize the edge attributes
        nx.set_edge_attributes(
            _graph,
            {edge: {"bandwidth_usage_for_demand_data": 0} for edge in _graph.edges},
        )
        nx.set_edge_attributes(
            _graph,
            {edge: {"bandwidth_usage_for_lsa_data": 0} for edge in _graph.edges},
        )

        ## Edgeの切断 ========================================
        for edge_remove_timing, edge_to_remove in edge_remove_schedule:
            if current_time >= edge_remove_timing:
                logger.info(
                    f"Edge {edge_to_remove[0].id} <-> {edge_to_remove[1].id} disconnected at {current_time}",
                )
                remove_edge_safe(_graph, *edge_to_remove)
                # Edge上にあるデータはパケットロスと判断
                packet_loss_count_demand = 0
                for comm_data_demand in comm_data_demand_list:
                    if isinstance(comm_data_demand.current_position, tuple) and set(
                        comm_data_demand.current_position,
                    ) == set(edge_to_remove):
                        comm_data_demand.packet_loss = True
                        comm_data_demand.delay = np.inf
                        packet_loss_count_demand += 1
                packet_loss_count_lsa = 0
                for comm_data_lsa in comm_data_lsa_list:
                    if isinstance(comm_data_lsa.current_position, tuple) and set(
                        comm_data_lsa.current_position,
                    ) == set(edge_to_remove):
                        comm_data_lsa.packet_loss = True
                        comm_data_lsa.delay = np.inf
                        packet_loss_count_lsa += 1
                if packet_loss_count_demand > 0 or packet_loss_count_lsa > 0:
                    logger.info(
                        f"Packet loss due to edge disconnection: "
                        f"{packet_loss_count_demand} demand packets, {packet_loss_count_lsa} LSA packets",
                    )

        ## Edgeの復旧 ========================================
        # TODO(Takashima): 未実装

        ## 通信データの生成 ========================================
        # LSAデータの生成
        if with_lsa:
            for edge_remove_timing, edge_to_remove in edge_remove_schedule:
                failure_detection_timing: np.datetime64 = edge_remove_timing + failure_detection_time
                if (time_idx == 0 and failure_detection_timing <= self.time[time_idx]) or (
                    time_idx > 0 and self.time[time_idx - 1] < failure_detection_timing <= self.time[time_idx]
                ):
                    # 切断Edgeに接続している node 情報を更新
                    for detect_node in edge_to_remove:
                        # 自身の情報を更新
                        self.node_knowledge_known_by_each_node[detect_node].update_node_knowledge_based_on_lsa(
                            comm_data_lsa=CommDataLSA(
                                data_size=self.packet_size,
                                packet_size=self.packet_size,
                                packet_num=1,
                                dst_node=detect_node,
                                next_node=detect_node,
                                current_position=detect_node,
                                path=[detect_node],
                                generated_time=current_time,
                                time=current_time,
                                time_from_generated=0.0,
                                time_remaining_for_current_position=0.0,
                                failure_position=edge_to_remove,
                            ),
                            current_time=current_time,
                            weight="weight",
                            backup_case=self.backup_case,
                            hops_limit=self.hop_limit,
                        )
                    # LSAデータの生成
                    if lsa_case == "from-source-to-all":
                        #  切断Edgeに接続している node から 他のすべての node に対して LSA データを生成
                        for detect_node in edge_to_remove:
                            for dst_node in self.nodes_dict.values():
                                if dst_node == detect_node:
                                    # dst_node = detect_node の場合はスキップ
                                    continue
                                next_node = calc_next_node_from_node_knowledge(
                                    node_knowledge=self.node_knowledge_known_by_each_node[detect_node],
                                    dst_node=dst_node,
                                    path=[detect_node],
                                    current_time=current_time,
                                    enable_random_routing_when_edge_failure=enable_random_routing_when_edge_failure,
                                    prevent_loop=prevent_loop,
                                )
                                if next_node is None:
                                    continue
                                comm_data_lsa = CommDataLSA(
                                    data_size=self.packet_size,
                                    packet_size=self.packet_size,
                                    packet_num=1,
                                    dst_node=dst_node,
                                    next_node=next_node,
                                    current_position=detect_node,
                                    path=[detect_node],
                                    generated_time=current_time,
                                    time=current_time,
                                    time_from_generated=0.0,
                                    time_remaining_for_current_position=0.0,
                                    failure_position=edge_to_remove,
                                )
                                comm_data_lsa_list.append(comm_data_lsa)
                    elif lsa_case == "adjacent":
                        # 切断Edgeに接続している node から 隣接 node に対して LSA データを生成
                        for detect_node in edge_to_remove:
                            for dst_node in list(_graph.neighbors(detect_node)):
                                next_node = calc_next_node_from_node_knowledge(
                                    node_knowledge=self.node_knowledge_known_by_each_node[detect_node],
                                    dst_node=dst_node,
                                    path=[detect_node],
                                    current_time=current_time,
                                    enable_random_routing_when_edge_failure=enable_random_routing_when_edge_failure,
                                    prevent_loop=prevent_loop,
                                )
                                if next_node is None:
                                    continue
                                comm_data_lsa = CommDataLSA(
                                    data_size=self.packet_size,
                                    packet_size=self.packet_size,
                                    packet_num=1,
                                    dst_node=dst_node,
                                    next_node=next_node,
                                    current_position=detect_node,
                                    path=[detect_node],
                                    generated_time=current_time,
                                    time=current_time,
                                    time_from_generated=0.0,
                                    time_remaining_for_current_position=0.0,
                                    failure_position=edge_to_remove,
                                )
                                comm_data_lsa_list.append(comm_data_lsa)

        # Demandデータの生成
        # 1パケットずつ扱うとシミュレーション時間がかかるので、宛先が同じ & 生成時刻が同じパケットをcomm_dataとしてまとめている  # noqa: E501
        for demand in self.demands:
            if isinstance(demand, ConstantCommunicationDemand) or (
                isinstance(demand, TemporaryCommunicationDemand) and demand.is_active(current_time)
            ):
                if demand.distribution == "uniform":
                    packet_num = int(demand.transmission_rate * time_step_s / self.packet_size)
                elif demand.distribution == "poisson":
                    packet_num = int(rng.poisson(lam=demand.transmission_rate * time_step_s / self.packet_size))

                next_node = calc_next_node_from_node_knowledge(
                    node_knowledge=self.node_knowledge_known_by_each_node[self.nodes_dict[demand.source]],
                    dst_node=self.nodes_dict[demand.destination],
                    path=[self.nodes_dict[demand.source]],
                    current_time=current_time,
                    enable_random_routing_when_edge_failure=enable_random_routing_when_edge_failure,
                    prevent_loop=prevent_loop,
                )
                if next_node is None:
                    continue
                comm_data_demand = CommDataDemand(
                    demand_id=demand.id,
                    data_size=packet_num * self.packet_size,
                    packet_size=self.packet_size,
                    packet_num=packet_num,
                    dst_node=self.nodes_dict[demand.destination],
                    next_node=next_node,
                    current_position=self.nodes_dict[demand.source],
                    path=[self.nodes_dict[demand.source]],
                    generated_time=current_time,
                    time=current_time,
                    time_from_generated=0.0,
                    time_remaining_for_current_position=0.0,
                )
                comm_data_demand_list.append(comm_data_demand)

        ## 通信データの伝播 ========================================
        # TODO(Takashima): comm_data_lsa_listやcomm_data_demand_listについて、届いたデータやパケットロスのデータを削除しない場合、forのループ回数が増大していくので、処理を検討  # noqa: E501

        # LSAデータの伝播
        if with_lsa:
            for comm_data_lsa in comm_data_lsa_list:
                if comm_data_lsa.packet_loss or comm_data_lsa.reach_dst:
                    continue
                if comm_data_lsa.generated_time > current_time + time_step:
                    continue

                # 共通処理
                if comm_data_lsa.time > current_time:
                    comm_data_lsa.time_remaining_for_current_position -= float(
                        (current_time + time_step - comm_data_lsa.time) / np.timedelta64(1, "s"),
                    )
                    comm_data_lsa.time = current_time + time_step
                else:
                    comm_data_lsa.time_remaining_for_current_position -= time_step_s
                    comm_data_lsa.time += time_step

                # Node -> Edge、Edge -> Nodeなど境界を超える時のデータの処理
                while comm_data_lsa.time_remaining_for_current_position < 0:
                    # Node -> Edge
                    if not isinstance(comm_data_lsa.current_position, tuple):
                        next_edge: tuple[Node, Node] = (
                            comm_data_lsa.current_position,
                            comm_data_lsa.next_node,
                        )

                        # 転送先のEdgeが存在しない時 -> パケットロスと判断
                        if not has_edge_bidirectional(_graph, *next_edge):
                            comm_data_lsa.packet_loss = True
                            comm_data_lsa.delay = np.inf
                            break

                        # Congestionの考慮 -> LSAデータについては、占有bandwidthの考慮は行わない

                        # 現在位置の更新
                        comm_data_lsa.current_position = next_edge
                        edge_data = get_edge_data(_graph, *next_edge)
                        if edge_data is not None:
                            edge_data["bandwidth_usage_for_lsa_data"] += comm_data_lsa.data_size / time_step_s

                            # 現在位置に留まる残時間の更新
                            comm_data_lsa.time_remaining_for_current_position += edge_data["delay"]
                            comm_data_lsa.time_from_generated += edge_data["delay"]

                    # Edge -> Node
                    elif isinstance(comm_data_lsa.current_position, tuple):
                        # 現在位置の更新
                        comm_data_lsa.current_position = comm_data_lsa.next_node
                        comm_data_lsa.path.append(comm_data_lsa.current_position)

                        # 現在の位置が目的地の場合
                        if comm_data_lsa.current_position == comm_data_lsa.dst_node:
                            comm_data_lsa.reach_dst = True
                            comm_data_lsa.delay = comm_data_lsa.time_from_generated
                            logger.info(
                                f"LSA data received at {comm_data_lsa.dst_node.id} "
                                f"(delay: {comm_data_lsa.delay:.6f}s, failure: {comm_data_lsa.failure_position})",
                            )
                            if lsa_case == "from-source-to-all":
                                self.node_knowledge_known_by_each_node[
                                    comm_data_lsa.current_position
                                ].update_node_knowledge_based_on_lsa(
                                    comm_data_lsa,
                                    current_time,
                                    weight="weight",
                                    backup_case=self.backup_case,
                                    hops_limit=self.hop_limit,
                                )
                            elif lsa_case == "adjacent":
                                already_registered = False
                                for failure_assumed_edge in self.node_knowledge_known_by_each_node[
                                    comm_data_lsa.current_position
                                ].failure_assumed_edge_list:
                                    if isinstance(comm_data_lsa.failure_position, tuple) and (
                                        set(failure_assumed_edge) == set(comm_data_lsa.failure_position)
                                    ):
                                        already_registered = True

                                # 同じリンク切断が既に登録されている場合は何もしない
                                if already_registered:
                                    logger.info(
                                        f"LSA data received at {comm_data_lsa.dst_node.id} "
                                        f"(already registered failure: {comm_data_lsa.failure_position})",
                                    )
                                # 登録されていないリンク切断情報の場合のみ、情報をアップデートし、隣接ノードに伝搬
                                else:
                                    logger.info(
                                        f"LSA data received at {comm_data_lsa.dst_node.id} "
                                        f"(new failure info: {comm_data_lsa.failure_position})",
                                    )
                                    self.node_knowledge_known_by_each_node[
                                        comm_data_lsa.current_position
                                    ].update_node_knowledge_based_on_lsa(
                                        comm_data_lsa,
                                        current_time,
                                        weight="weight",
                                        backup_case=self.backup_case,
                                        hops_limit=self.hop_limit,
                                    )

                                    # 隣接nodeにLSAデータを伝搬
                                    graph_known_by_node = self.node_knowledge_known_by_each_node[
                                        comm_data_lsa.current_position
                                    ].space_time_graph.get_space_time_graph_at_time(current_time)
                                    next_destination_list = list(
                                        graph_known_by_node.neighbors(comm_data_lsa.current_position),
                                    )

                                    for dst_node in next_destination_list:
                                        next_node = calc_next_node_from_node_knowledge(
                                            node_knowledge=self.node_knowledge_known_by_each_node[
                                                comm_data_lsa.current_position
                                            ],
                                            dst_node=dst_node,
                                            path=[comm_data_lsa.current_position],
                                            current_time=current_time,
                                            enable_random_routing_when_edge_failure=enable_random_routing_when_edge_failure,
                                            prevent_loop=prevent_loop,
                                        )
                                        if next_node is None:
                                            continue

                                        generated_time = (
                                            comm_data_lsa.generated_time
                                            + np.timedelta64(
                                                int(comm_data_lsa.delay * 1e3),
                                                "ms",
                                            )  # TODO(): 現状だとms単位で生成され、より細かい単位に対応できない
                                            + np.timedelta64(
                                                10,
                                                "ms",
                                            )  # TODO(): 処理遅延についてパラメータで設定できるようにする
                                        )
                                        comm_data_lsa_new = CommDataLSA(
                                            data_size=self.packet_size,
                                            packet_size=self.packet_size,
                                            packet_num=1,
                                            dst_node=dst_node,
                                            next_node=next_node,
                                            current_position=comm_data_lsa.current_position,
                                            path=[comm_data_lsa.current_position],
                                            generated_time=generated_time,
                                            time=generated_time,
                                            time_from_generated=0,
                                            time_remaining_for_current_position=0,
                                            failure_position=comm_data_lsa.failure_position,
                                        )
                                        comm_data_lsa_list.append(comm_data_lsa_new)
                            else:
                                pass

                            break  # 目的地に到達した場合は、次の通信データに移る

                        # 現在の位置が目的地でない場合
                        # 現在位置に留まる残時間の更新
                        # TODO(Takashima): 実際にはBufferに入れたりするのでその処理を検討
                        comm_data_lsa.time_remaining_for_current_position += _graph.nodes[
                            comm_data_lsa.current_position
                        ]["delay"]
                        comm_data_lsa.time_from_generated += _graph.nodes[comm_data_lsa.current_position]["delay"]

                        # ルーティングテーブルの参照
                        next_node = calc_next_node_from_node_knowledge(
                            node_knowledge=self.node_knowledge_known_by_each_node[comm_data_lsa.current_position],
                            dst_node=comm_data_lsa.dst_node,
                            path=comm_data_lsa.path,
                            current_time=current_time,
                            enable_random_routing_when_edge_failure=enable_random_routing_when_edge_failure,
                            prevent_loop=prevent_loop,
                        )
                        if next_node is None:
                            comm_data_lsa.packet_loss = True
                            comm_data_lsa.delay = np.inf
                            break
                        comm_data_lsa.next_node = next_node

        # Demandデータの伝播
        # TODO(Takashima): 現状は先に生成されたデータの伝播が優先されるので, 優先順位について検討
        for comm_data_demand in comm_data_demand_list:
            if comm_data_demand.packet_loss or comm_data_demand.reach_dst:
                continue
            if comm_data_demand.generated_time > current_time + time_step:
                continue

            # 共通処理
            if comm_data_demand.time > current_time:
                comm_data_demand.time_remaining_for_current_position -= float(
                    (current_time + time_step - comm_data_demand.time) / np.timedelta64(1, "s"),
                )
                comm_data_demand.time = current_time + time_step
            else:
                comm_data_demand.time_remaining_for_current_position -= time_step_s
                comm_data_demand.time += time_step

            # Node -> Edge、Edge -> Nodeなど境界を超える時のデータの処理
            while comm_data_demand.time_remaining_for_current_position < 0:
                # Node -> Edge
                if not isinstance(comm_data_demand.current_position, tuple):
                    next_edge = (
                        comm_data_demand.current_position,
                        comm_data_demand.next_node,
                    )

                    # 転送先のEdgeが存在しない時 -> パケットロスと判断
                    if not has_edge_bidirectional(_graph, *next_edge):
                        comm_data_demand.packet_loss = True
                        comm_data_demand.delay = np.inf
                        break

                    # Congestionの考慮
                    edge_data = get_edge_data(_graph, *next_edge)
                    if edge_data is None:
                        comm_data_demand.packet_loss = True
                        comm_data_demand.delay = np.inf
                        break

                    if (
                        edge_data["bandwidth_usage_for_demand_data"] + comm_data_demand.data_size / time_step_s
                        > edge_data["link_capacity"]
                    ):
                        comm_data_demand.packet_loss = True
                        comm_data_demand.delay = np.inf
                        break

                    # 現在位置の更新
                    comm_data_demand.current_position = next_edge
                    edge_data["bandwidth_usage_for_demand_data"] += comm_data_demand.data_size / time_step_s

                    # 現在位置に留まる残時間の更新
                    comm_data_demand.time_remaining_for_current_position += edge_data["delay"]
                    comm_data_demand.time_from_generated += edge_data["delay"]

                # Edge -> Node
                elif isinstance(comm_data_demand.current_position, tuple):
                    # 現在位置の更新
                    comm_data_demand.current_position = comm_data_demand.next_node
                    comm_data_demand.path.append(comm_data_demand.current_position)

                    # 現在の位置が目的地の場合
                    if comm_data_demand.current_position == comm_data_demand.dst_node:
                        comm_data_demand.reach_dst = True
                        comm_data_demand.delay = comm_data_demand.time_from_generated
                        break  # 目的地に到達した場合は、次の通信データに移る

                    # 現在の位置が目的地でない場合
                    # 現在位置に留まる残時間の更新
                    # TODO(Takashima): 実際にはBufferに入れたりするのでその処理を検討
                    # -> 手計算では, forwarding rate が10Gbpsの場合 Queuing Delayは10-100μsになりそうで小さい
                    comm_data_demand.time_remaining_for_current_position += _graph.nodes[
                        comm_data_demand.current_position
                    ]["delay"]
                    comm_data_demand.time_from_generated += _graph.nodes[comm_data_demand.current_position]["delay"]

                    # ルーティングテーブルの参照
                    next_node = calc_next_node_from_node_knowledge(
                        node_knowledge=self.node_knowledge_known_by_each_node[comm_data_demand.current_position],
                        dst_node=comm_data_demand.dst_node,
                        path=comm_data_demand.path,
                        current_time=current_time,
                        enable_random_routing_when_edge_failure=enable_random_routing_when_edge_failure,
                        prevent_loop=prevent_loop,
                    )
                    if next_node is None:
                        comm_data_demand.packet_loss = True
                        comm_data_demand.delay = np.inf
                        break
                    comm_data_demand.next_node = next_node

        ## Save the graph to the list of graphs after simulation for each time step ================================
        all_graphs_after_simulation.append(_graph)

    ## Save simulation results ========================================
    # シミュレーション結果の統計をログ出力
    total_demand_packets = len(comm_data_demand_list)
    successful_demand_packets = sum(1 for comm in comm_data_demand_list if comm.reach_dst)
    packet_loss_demand_packets = sum(1 for comm in comm_data_demand_list if comm.packet_loss)

    total_lsa_packets = len(comm_data_lsa_list)
    successful_lsa_packets = sum(1 for comm in comm_data_lsa_list if comm.reach_dst)
    packet_loss_lsa_packets = sum(1 for comm in comm_data_lsa_list if comm.packet_loss)

    logger.info("Simulation completed")
    logger.info(
        f"Demand packets - Total: {total_demand_packets}, "
        f"Successful: {successful_demand_packets}, "
        f"Lost: {packet_loss_demand_packets}",
    )
    logger.info(
        f"LSA packets - Total: {total_lsa_packets}, "
        f"Successful: {successful_lsa_packets}, "
        f"Lost: {packet_loss_lsa_packets}",
    )

    if successful_demand_packets > 0:
        avg_delay_demand = np.mean([comm.delay for comm in comm_data_demand_list if comm.reach_dst])
        logger.info(f"Average delay for successful demand packets: {avg_delay_demand:.6f}s")

    return PacketRoutingResult(
        all_graphs_after_simulation=all_graphs_after_simulation,
        comm_data_demand_list=comm_data_demand_list,
        comm_data_lsa_list=comm_data_lsa_list,
        node_knowledge_known_by_each_node=self.node_knowledge_known_by_each_node,
    )

PacketRoutingResult dataclass

PacketRoutingResult(
    *,
    all_graphs_after_simulation: list[Graph] = list[
        Graph
    ](),
    node_knowledge_known_by_each_node: dict[
        Node, NodeKnowledge
    ] = dict[Node, NodeKnowledge](),
    comm_data_demand_list: list[CommDataDemand] = list[
        CommDataDemand
    ](),
    comm_data_lsa_list: list[CommDataLSA] = list[
        CommDataLSA
    ]()
)

all_graphs_after_simulation class-attribute instance-attribute

all_graphs_after_simulation: list[Graph] = field(
    default_factory=list[Graph]
)

comm_data_demand_list class-attribute instance-attribute

comm_data_demand_list: list[CommDataDemand] = field(
    default_factory=list[CommDataDemand]
)

comm_data_lsa_list class-attribute instance-attribute

comm_data_lsa_list: list[CommDataLSA] = field(
    default_factory=list[CommDataLSA]
)

node_knowledge_known_by_each_node class-attribute instance-attribute

node_knowledge_known_by_each_node: dict[
    Node, NodeKnowledge
] = field(default_factory=dict[Node, NodeKnowledge])

load classmethod

load(
    graphs_path: Path | None = None,
    node_knowledge_path: Path | None = None,
    comm_data_demand_path: Path | None = None,
    comm_data_lsa_path: Path | None = None,
) -> Self

指定されたパスからpickleファイルを読み込み.

Source code in src/cosmica/experimental_packet_routing/dtos.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@classmethod
def load(
    cls,
    graphs_path: Path | None = None,
    node_knowledge_path: Path | None = None,
    comm_data_demand_path: Path | None = None,
    comm_data_lsa_path: Path | None = None,
) -> Self:
    """指定されたパスからpickleファイルを読み込み."""
    all_graphs_after_simulation = []
    if graphs_path is not None and graphs_path.exists():
        with graphs_path.open("rb") as f:
            all_graphs_after_simulation = pickle.load(f)  # noqa: S301
        logger.info(f"Loaded all graphs after simulation: {len(all_graphs_after_simulation)}")
    else:
        logger.info("No graph data to load.")

    node_knowledge_known_by_each_node = {}
    if node_knowledge_path is not None and node_knowledge_path.exists():
        with node_knowledge_path.open("rb") as f:
            node_knowledge_known_by_each_node = pickle.load(f)  # noqa: S301
        logger.info(f"Loaded network information: {len(node_knowledge_known_by_each_node)}")
    else:
        logger.info("No node knowledge data to load.")

    comm_data_demand_list = []
    if comm_data_demand_path is not None and comm_data_demand_path.exists():
        with comm_data_demand_path.open("rb") as f:
            comm_data_demand_list = pickle.load(f)  # noqa: S301
        logger.info(f"Loaded comm data demand: {len(comm_data_demand_list)}")
    else:
        logger.info("No comm data demand to load.")

    comm_data_lsa_list = []
    if comm_data_lsa_path is not None and comm_data_lsa_path.exists():
        with comm_data_lsa_path.open("rb") as f:
            comm_data_lsa_list = pickle.load(f)  # noqa: S301
        logger.info(f"Loaded comm data lsa: {len(comm_data_lsa_list)}")
    else:
        logger.info("No comm data lsa to load.")

    return cls(
        all_graphs_after_simulation=all_graphs_after_simulation,
        node_knowledge_known_by_each_node=node_knowledge_known_by_each_node,
        comm_data_demand_list=comm_data_demand_list,
        comm_data_lsa_list=comm_data_lsa_list,
    )

save_all_graphs_after_simulation

save_all_graphs_after_simulation(save_path: Path) -> None
Source code in src/cosmica/experimental_packet_routing/dtos.py
44
45
46
47
48
49
50
def save_all_graphs_after_simulation(self, save_path: Path) -> None:
    if self.all_graphs_after_simulation is not None:
        with save_path.open("wb") as f:
            pickle.dump(self.all_graphs_after_simulation, f)
        logger.info(f"Saved the simulation results of graph to {save_path}")
    else:
        logger.info("No graph data to save.")

save_comm_data_demand_list

save_comm_data_demand_list(save_path: Path) -> None
Source code in src/cosmica/experimental_packet_routing/dtos.py
60
61
62
63
64
65
66
def save_comm_data_demand_list(self, save_path: Path) -> None:
    if self.comm_data_demand_list is not None:
        with save_path.open("wb") as f:
            pickle.dump(self.comm_data_demand_list, f)
        logger.info(f"Saved the simulation results of comm data demand to {save_path}")
    else:
        logger.info("No comm data demand to save.")

save_comm_data_lsa_list

save_comm_data_lsa_list(save_path: Path) -> None
Source code in src/cosmica/experimental_packet_routing/dtos.py
68
69
70
71
72
73
74
def save_comm_data_lsa_list(self, save_path: Path) -> None:
    if self.comm_data_lsa_list is not None:
        with save_path.open("wb") as f:
            pickle.dump(self.comm_data_lsa_list, f)
        logger.info(f"Saved the simulation results of comm data lsa to {save_path}")
    else:
        logger.info("No comm data lsa to save.")

save_node_knowledge_known_by_each_node

save_node_knowledge_known_by_each_node(
    save_path: Path,
) -> None
Source code in src/cosmica/experimental_packet_routing/dtos.py
52
53
54
55
56
57
58
def save_node_knowledge_known_by_each_node(self, save_path: Path) -> None:
    if self.node_knowledge_known_by_each_node is not None:
        with save_path.open("wb") as f:
            pickle.dump(self.node_knowledge_known_by_each_node, f)
        logger.info(f"Saved the simulation results of node knowledge to {save_path}")
    else:
        logger.info("No node knowledge data to save.")

PacketRoutingSetting dataclass

PacketRoutingSetting(
    *,
    time: NDArray[datetime64],
    nodes_dict: dict[NodeGID, Node],
    demands: list[Demand],
    backup_case: BackupCaseType,
    hop_limit: int,
    packet_size: int
)

backup_case instance-attribute

backup_case: BackupCaseType

demands instance-attribute

demands: list[Demand]

hop_limit instance-attribute

hop_limit: int

nodes_dict instance-attribute

nodes_dict: dict[NodeGID, Node]

packet_size instance-attribute

packet_size: int

time instance-attribute

time: NDArray[datetime64]

load classmethod

load(load_path: Path) -> PacketRoutingSetting
Source code in src/cosmica/experimental_packet_routing/dtos.py
140
141
142
143
144
145
@classmethod
def load(cls, load_path: Path) -> PacketRoutingSetting:
    with load_path.open("rb") as f:
        return pickle.load(f)  # noqa: S301
    logger.info(f"Loaded the packet routing setting from {load_path}")
    return None

save

save(save_path: Path) -> None
Source code in src/cosmica/experimental_packet_routing/dtos.py
135
136
137
138
def save(self, save_path: Path) -> None:
    with save_path.open("wb") as f:
        pickle.dump(self, f)
    logger.info(f"Saved the packet routing setting to {save_path}")

RoutingResultVisualizer

RoutingResultVisualizer(
    simulation_settings: PacketRoutingSetting,
    packet_routing_result: PacketRoutingResult,
)
Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
32
33
34
35
36
37
38
def __init__(
    self,
    simulation_settings: PacketRoutingSetting,
    packet_routing_result: PacketRoutingResult,
) -> None:
    self.simulation_settings: PacketRoutingSetting = simulation_settings
    self.packet_routing_result: PacketRoutingResult = packet_routing_result

packet_routing_result instance-attribute

packet_routing_result: PacketRoutingResult = (
    packet_routing_result
)

simulation_settings instance-attribute

simulation_settings: PacketRoutingSetting = (
    simulation_settings
)

time cached property

time: NDArray

time_from_epoch cached property

time_from_epoch: NDArray

time_step_array cached property

time_step_array: NDArray

calculate_average_delay

calculate_average_delay(
    *,
    time_from: datetime64 | None = None,
    time_to: datetime64 | None = None,
    weighted_data_size: bool = True
) -> float

到着したデータについて平均遅延時間を計算する.

Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def calculate_average_delay(
    self,
    *,
    time_from: np.datetime64 | None = None,
    time_to: np.datetime64 | None = None,
    weighted_data_size: bool = True,
) -> float:
    """到着したデータについて平均遅延時間を計算する."""
    calc_df = pd.DataFrame(
        {
            "generated_time": [
                comm_data_demand.generated_time
                for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "delay": [
                comm_data_demand.delay for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "data_size": [
                comm_data_demand.data_size for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
        },
    )

    # 時間範囲を指定してフィルタリング
    if time_from is not None:
        calc_df = calc_df[calc_df["generated_time"] >= time_from]
    if time_to is not None:
        calc_df = calc_df[calc_df["generated_time"] <= time_to]

    # 有限な遅延値のみを選択
    finite_delay_ser: Series = calc_df["delay"][np.isfinite(calc_df["delay"])]
    finite_data_size_ser: pd.Series = calc_df["data_size"][np.isfinite(calc_df["delay"])]

    if weighted_data_size:
        # データサイズで重み付けした平均遅延時間を計算
        average_delay: float = np.average(finite_delay_ser, weights=finite_data_size_ser)
    else:
        # 単純な平均遅延時間を計算
        average_delay = finite_delay_ser.mean()

    logger.info(f"Average delay: {average_delay} s")

    return average_delay

calculate_average_increased_delay

calculate_average_increased_delay(
    time_baseline: datetime64,
    *,
    time_from: datetime64 | None = None,
    time_to: datetime64 | None = None,
    weighted_data_size: bool = True
) -> float

到着したデータについて平均遅延時間の、ある時刻の遅延時間に対する増加量を計算する.

Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def calculate_average_increased_delay(
    self,
    time_baseline: np.datetime64,
    *,
    time_from: np.datetime64 | None = None,
    time_to: np.datetime64 | None = None,
    weighted_data_size: bool = True,
) -> float:
    """到着したデータについて平均遅延時間の、ある時刻の遅延時間に対する増加量を計算する."""
    calc_df = pd.DataFrame(
        {
            "generated_time": [
                comm_data_demand.generated_time
                for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "delay": [
                comm_data_demand.delay for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "data_size": [
                comm_data_demand.data_size for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
        },
    )

    # 時間範囲を指定してフィルタリング
    if time_from is not None:
        calc_df = calc_df[calc_df["generated_time"] >= time_from]
    if time_to is not None:
        calc_df = calc_df[calc_df["generated_time"] <= time_to]

    # 有限な遅延値のみを選択
    finite_delay_ser: Series = calc_df["delay"][np.isfinite(calc_df["delay"])]
    finite_data_size_ser: pd.Series = calc_df["data_size"][np.isfinite(calc_df["delay"])]

    if weighted_data_size:
        # データサイズで重み付けした平均遅延時間を計算
        average_delay: float = np.average(
            finite_delay_ser,
            weights=finite_data_size_ser,
        )
    else:
        # 単純な平均遅延時間を計算
        average_delay = finite_delay_ser.mean()

    # 指定した時刻の遅延時間からの差分を計算
    closest_idx = (calc_df["generated_time"] - pd.to_datetime(time_baseline)).abs().idxmin()
    average_increased_delay = average_delay - pd.to_numeric(
        calc_df.loc[closest_idx, "delay"],
    )

    logger.info(f"Average increased delay: {average_increased_delay} s")

    return average_delay

calculate_average_packet_loss_rate

calculate_average_packet_loss_rate(
    *,
    time_from: datetime64 | None = None,
    time_to: datetime64 | None = None,
    weighted_data_size: bool = True
) -> float

生成データの平均パケットロス率を計算する.

Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def calculate_average_packet_loss_rate(
    self,
    *,
    time_from: np.datetime64 | None = None,
    time_to: np.datetime64 | None = None,
    weighted_data_size: bool = True,
) -> float:
    """生成データの平均パケットロス率を計算する."""
    calc_df = pd.DataFrame(
        {
            "generated_time": [
                comm_data_demand.generated_time
                for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "reach_dst": [
                comm_data_demand.reach_dst for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "data_size": [
                comm_data_demand.data_size for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
        },
    )

    # 時間範囲を指定してフィルタリング
    if time_from is not None:
        calc_df = calc_df[calc_df["generated_time"] >= time_from]
    if time_to is not None:
        calc_df = calc_df[calc_df["generated_time"] <= time_to]

    if weighted_data_size:
        # データサイズで重み付けした平均パケットロス率を計算
        average_packet_reach_rate: float = np.average(calc_df["reach_dst"], weights=calc_df["data_size"])
    else:
        average_packet_reach_rate = calc_df["reach_dst"].mean()

    average_packet_loss_rate = 1 - average_packet_reach_rate
    logger.info(f"Average packet loss rate: {average_packet_loss_rate}")

    return average_packet_loss_rate

plot_accumulated_arrival_data_size

plot_accumulated_arrival_data_size(
    save_path: Path,
    *,
    with_title: bool = False,
    title_fontsize: int = 16,
    label_fontsize: int = 14,
    legend_fontsize: int = 12,
    tick_label_fontsize: int = 12,
    legend_loc: str = "best",
    xlim: tuple | None = None,
    ylim: tuple | None = None,
    dpi: int = 300,
    with_grid: bool = False,
    use_time_from_epoch: bool = False
) -> None
Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def plot_accumulated_arrival_data_size(
    self,
    save_path: Path,
    *,
    with_title: bool = False,
    title_fontsize: int = 16,
    label_fontsize: int = 14,
    legend_fontsize: int = 12,
    tick_label_fontsize: int = 12,
    legend_loc: str = "best",
    xlim: tuple | None = None,
    ylim: tuple | None = None,
    dpi: int = 300,
    with_grid: bool = False,
    use_time_from_epoch: bool = False,
    # link_failure_timing=None,
) -> None:
    ## Convert comm_data_demand_list into a pandas DataFrame
    # 到達したデータ, もしくはパケットロスしたデータのみプロットする
    plot_df = pd.DataFrame(
        {
            "generated_time": [
                comm_data_demand.generated_time
                for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "delay": [
                comm_data_demand.delay for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "data_size": [
                comm_data_demand.data_size for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "reach_dst": [
                comm_data_demand.reach_dst for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
        },
    )
    plot_df = plot_df[plot_df["reach_dst"]]

    plot_df["arrival_time"] = plot_df["generated_time"] + pd.to_timedelta(plot_df["delay"], unit="s")
    plot_df = plot_df.sort_values("arrival_time")

    plot_df["accumulated_arrival_data_size"] = plot_df.groupby("reach_dst")["data_size"].cumsum()

    if use_time_from_epoch:
        x_data = (plot_df["arrival_time"] - self.time[0]) / np.timedelta64(1, "s")
    else:
        x_data = plot_df["arrival_time"]

    ## プロット
    fig, ax = plt.subplots(figsize=(10, 6), dpi=dpi)

    ax.plot(
        x_data,
        plot_df["accumulated_arrival_data_size"],
        label="Accumulated arrival data size",
        color="blue",
    )

    # Customizing title and labels font size
    if with_title:
        ax.set_title("Accumulated Arrival Data Size", fontsize=title_fontsize)
    ax.set_ylabel("Accumulated arrival data size [bit]", fontsize=label_fontsize)
    if use_time_from_epoch:
        ax.set_xlabel("Time from Epoch [s]", fontsize=label_fontsize)
    else:
        ax.set_xlabel("Time", fontsize=label_fontsize)
        # Formatting the x-axis with date and time, and rotating the text
        ax.xaxis.set_major_formatter(DateFormatter("%Y/%m/%d %H:%M:%S"))
        plt.setp(ax.get_xticklabels(), rotation=45, ha="right")

    # Customizing tick label font size
    ax.tick_params(axis="both", labelsize=tick_label_fontsize)

    # 軸の範囲設定
    if use_time_from_epoch:
        ax.set_xlim(xlim if xlim else (self.time_from_epoch[0], self.time_from_epoch[-1]))
    else:
        ax.set_xlim(xlim if xlim else (self.time[0], self.time[-1]))
    ax.set_ylim(ylim if ylim else (0, None))

    if with_grid:
        ax.xaxis.set_minor_locator(AutoMinorLocator(5))
        ax.yaxis.set_minor_locator(AutoMinorLocator(5))
        ax.grid(which="major", color="#CCCCCC", linestyle="--")
        ax.grid(which="minor", color="#CCCCCC", linestyle=":")

    # 凡例の設定
    ax.legend(fontsize=legend_fontsize, loc=legend_loc)

    # 保存
    fig.savefig(save_path, bbox_inches="tight")
    logger.info(f"Saved visualization: {save_path}")

plot_arrival_data_rate

plot_arrival_data_rate(
    save_path: Path,
    *,
    with_title: bool = False,
    title_fontsize: int = 16,
    label_fontsize: int = 14,
    legend_fontsize: int = 12,
    tick_label_fontsize: int = 12,
    legend_loc: str = "best",
    xlim: tuple | None = None,
    ylim: tuple | None = None,
    dpi: int = 300,
    with_grid: bool = False,
    use_time_from_epoch: bool = False
) -> None
Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def plot_arrival_data_rate(
    self,
    save_path: Path,
    *,
    with_title: bool = False,
    title_fontsize: int = 16,
    label_fontsize: int = 14,
    legend_fontsize: int = 12,
    tick_label_fontsize: int = 12,
    legend_loc: str = "best",
    xlim: tuple | None = None,
    ylim: tuple | None = None,
    dpi: int = 300,
    with_grid: bool = False,
    use_time_from_epoch: bool = False,
    # link_failure_timing=None,
) -> None:
    # --- 1. 必要データを整形 ---------------------------------------------------
    plot_df = pd.DataFrame(
        {
            "generated_time": [
                comm_data_demand.generated_time
                for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "delay": [
                comm_data_demand.delay for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "data_size": [
                comm_data_demand.data_size for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "reach_dst": [
                comm_data_demand.reach_dst for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
        },
    )
    plot_df = plot_df[plot_df["reach_dst"]]
    plot_df["arrival_time"] = (plot_df["generated_time"] + pd.to_timedelta(plot_df["delay"], unit="s")).astype(
        "datetime64[ms]",
    )

    # --- 2. タイムステップ情報 --------------------------------------------------
    time_index = pd.Series(self.time).astype("datetime64[ms]")
    dt_seconds = np.asarray(self.time_step_array, dtype=float)

    # --- 3. 各 arrival を直前の time_index に割り当て ---------------------------
    # np.searchsorted で挿入位置を取得 → 1 つ前のインデックスが「属するステップ」
    idx = np.searchsorted(time_index.to_numpy(), plot_df["arrival_time"].to_numpy(), side="right") - 1
    valid = idx >= 0  # 負インデックスは epoch 以前なので除外

    # --- 4. ステップ毎にバイト数を加算 -----------------------------------------
    bytes_per_step = np.zeros(len(time_index), dtype=float)
    np.add.at(bytes_per_step, idx[valid], plot_df["data_size"].to_numpy()[valid])

    arrival_rate = bytes_per_step / dt_seconds

    # --- 5. 可視化 -------------------------------------------------------------
    x_data = (time_index - time_index.iloc[0]).dt.total_seconds() if use_time_from_epoch else time_index

    fig, ax = plt.subplots(figsize=(10, 6), dpi=dpi)
    ax.plot(x_data, arrival_rate, label="Arrival data rate", lw=1)

    if with_title:
        ax.set_title("Arrival Data Rate", fontsize=title_fontsize)
    ax.set_ylabel("Arrival data rate [bit/s]", fontsize=label_fontsize)
    if use_time_from_epoch:
        ax.set_xlabel("Time from Epoch [s]", fontsize=label_fontsize)
    else:
        ax.set_xlabel("Time", fontsize=label_fontsize)
        ax.xaxis.set_major_formatter(DateFormatter("%Y/%m/%d %H:%M:%S"))
        plt.setp(ax.get_xticklabels(), rotation=45, ha="right")

    ax.tick_params(axis="both", labelsize=tick_label_fontsize)
    ax.set_xlim(xlim if xlim else (x_data.iloc[0], x_data.iloc[-1]))
    ax.set_ylim(ylim if ylim else (0, None))

    if with_grid:
        ax.xaxis.set_minor_locator(AutoMinorLocator(5))
        ax.yaxis.set_minor_locator(AutoMinorLocator(5))
        ax.grid(which="major", linestyle="--", alpha=0.6)
        ax.grid(which="minor", linestyle=":", alpha=0.4)

    ax.legend(fontsize=legend_fontsize, loc=legend_loc)
    fig.savefig(save_path, bbox_inches="tight")
    logger.info(f"Saved visualization: {save_path}")

plot_delay

plot_delay(
    save_path: Path,
    *,
    with_title: bool = False,
    title_fontsize: int = 16,
    label_fontsize: int = 14,
    legend_fontsize: int = 12,
    tick_label_fontsize: int = 12,
    legend_loc: str = "best",
    xlim: tuple | None = None,
    ylim: tuple | None = None,
    dpi: int = 300,
    with_grid: bool = False,
    use_time_from_epoch: bool = False,
    replace_inf: bool = False
) -> None
Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def plot_delay(
    self,
    save_path: Path,
    *,
    with_title: bool = False,
    title_fontsize: int = 16,
    label_fontsize: int = 14,
    legend_fontsize: int = 12,
    tick_label_fontsize: int = 12,
    legend_loc: str = "best",
    xlim: tuple | None = None,
    ylim: tuple | None = None,
    dpi: int = 300,
    with_grid: bool = False,
    use_time_from_epoch: bool = False,
    replace_inf: bool = False,
    # link_failure_timing=None,
) -> None:
    ## Convert comm_data_demand_list into a pandas DataFrame
    # 到達したデータ, もしくはパケットロスしたデータのみプロットする
    plot_df = pd.DataFrame(
        {
            "generated_time": [
                comm_data_demand.generated_time
                for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
            "delay": [
                comm_data_demand.delay for comm_data_demand in self.packet_routing_result.comm_data_demand_list
            ],
        },
    )
    plot_df = plot_df.sort_values("generated_time")

    if use_time_from_epoch:
        x_data = (plot_df["generated_time"] - self.time[0]) / np.timedelta64(1, "s")
    else:
        x_data = plot_df["generated_time"]

    max_delay = plot_df["delay"][plot_df["delay"] != np.inf].max()

    if replace_inf:
        # np.infだとプロットされないので、適当な大きい値に置換
        plot_df["delay"] = plot_df["delay"].replace(np.inf, 1e5)

    ## プロット
    fig, ax = plt.subplots(figsize=(10, 6), dpi=dpi)

    ax.plot(x_data, plot_df["delay"], label="Delay", color="blue")

    # Customizing title and labels font size
    if with_title:
        ax.set_title("Delay", fontsize=title_fontsize)
    ax.set_ylabel("Average delay time [s]", fontsize=label_fontsize)
    if use_time_from_epoch:
        ax.set_xlabel("Time from epoch[s]", fontsize=label_fontsize)
    else:
        ax.set_xlabel("Time", fontsize=label_fontsize)
        # Formatting the x-axis with date and time, and rotating the text
        ax.xaxis.set_major_formatter(DateFormatter("%Y/%m/%d %H:%M:%S"))
        plt.setp(ax.get_xticklabels(), rotation=45, ha="right")

    # Customizing tick label font size
    ax.tick_params(axis="both", labelsize=tick_label_fontsize)

    # 軸の範囲設定
    if use_time_from_epoch:
        ax.set_xlim(xlim if xlim else (self.time_from_epoch[0], self.time_from_epoch[-1]))
    else:
        ax.set_xlim(xlim if xlim else (self.time[0], self.time[-1]))

    if max_delay == np.inf or np.isnan(max_delay):
        ax.set_ylim(0, None)
    else:
        ax.set_ylim(ylim if ylim else (0, max_delay * 1.1))

    if with_grid:
        ax.xaxis.set_minor_locator(AutoMinorLocator(5))
        ax.yaxis.set_minor_locator(AutoMinorLocator(5))
        ax.grid(which="major", color="#CCCCCC", linestyle="--")
        ax.grid(which="minor", color="#CCCCCC", linestyle=":")

    # 凡例の設定
    ax.legend(fontsize=legend_fontsize, loc=legend_loc)

    # 保存
    fig.savefig(save_path, bbox_inches="tight")
    logger.info(f"Saved visualization: {save_path}")

plot_graph_animation

plot_graph_animation(
    time_index_from: int,
    time_index_to: int,
    time_index_step: int,
    dynamics_data: DynamicsData[ConstellationSatellite],
    save_path: Path,
    *,
    with_demand_data: bool = True,
    with_lsa_data: bool = False,
    dpi: int = 100
) -> None

時間経過に伴うグラフの変化をアニメーションで描画する.

Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
def plot_graph_animation(
    self,
    time_index_from: int,
    time_index_to: int,
    time_index_step: int,
    dynamics_data: DynamicsData[ConstellationSatellite],
    save_path: Path,
    *,
    with_demand_data: bool = True,
    with_lsa_data: bool = False,
    dpi: int = 100,
) -> None:
    """時間経過に伴うグラフの変化をアニメーションで描画する."""
    # TODO(Takashima): time_index でなく timeを引数に取れるようにする
    time_index_for_plot = range(time_index_from, time_index_to, time_index_step)

    fig, ax = plt.subplots(figsize=(15, 8))

    def update(frame: int):  # noqa: ANN202
        ax.clear()
        time_index = time_index_for_plot[frame]

        edges_with_demand_data = self._get_edges_with_demand_data(time_index) if with_demand_data else set()

        edges_with_lsa_data = self._get_edges_with_lsa_data(time_index) if with_lsa_data else set()

        title = f"Time: {np.datetime_as_string(self.time[time_index], unit='ms').split('T')[1]}"
        ax.set_title(title)

        draw_lat_lon_grid(ax=ax)
        draw_countries(ax=ax)
        draw_snapshot(
            graph=self.packet_routing_result.all_graphs_after_simulation[time_index],
            dynamics_data=dynamics_data[time_index],
            ax=ax,
            with_labels=False,
            focus_edges_list=[edges_with_demand_data, edges_with_lsa_data],
            focus_edges_label_list=["Demand data", "LSA data"],
        )
        ax.legend(loc="lower left")

    ani = FuncAnimation(fig, update, frames=len(time_index_for_plot), interval=100)
    ani.save(
        save_path,
        dpi=dpi,
    )

plot_graph_at_certain_time

plot_graph_at_certain_time(
    time: datetime64,
    dynamics_data: DynamicsData[ConstellationSatellite],
    save_path: Path,
    *,
    with_demand_data: bool = True,
    with_lsa_data: bool = False,
    dpi: int = 100
) -> None

指定した時間のグラフを描画する.

Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def plot_graph_at_certain_time(
    self,
    time: np.datetime64,
    dynamics_data: DynamicsData[ConstellationSatellite],
    save_path: Path,
    *,
    with_demand_data: bool = True,
    with_lsa_data: bool = False,
    dpi: int = 100,
) -> None:
    """指定した時間のグラフを描画する."""
    time_indices = np.where(self.time == time)[0]
    if len(time_indices) == 0:
        msg = f"Time {time} not found in self.time array."
        raise ValueError(msg)
    time_index = time_indices[0]

    edges_with_demand_data = self._get_edges_with_demand_data(time_index) if with_demand_data else set()

    edges_with_lsa_data = self._get_edges_with_lsa_data(time_index) if with_lsa_data else set()
    # routing_table_arrows = get_routing_table_arrows(time_idx)

    fig, ax = plt.subplots(figsize=(15, 8))

    title = f"Time: {np.datetime_as_string(time, unit='ms').split('T')[1]}"
    ax.set_title(title)

    draw_lat_lon_grid(ax=ax)
    draw_countries(ax=ax)
    draw_snapshot(
        graph=self.packet_routing_result.all_graphs_after_simulation[time_index],
        dynamics_data=dynamics_data[time_index],
        ax=ax,
        with_labels=False,
        focus_edges_list=[edges_with_demand_data, edges_with_lsa_data],
        focus_edges_label_list=["Demand data", "LSA data"],
    )

    ax.legend(loc="lower left")

    # 保存
    fig.savefig(save_path, bbox_inches="tight", dpi=dpi)
    logger.info(f"Saved visualization: {save_path}")

plot_graph_over_time

plot_graph_over_time(
    time_index_from: int,
    time_index_to: int,
    time_index_step: int,
    dynamics_data: DynamicsData[ConstellationSatellite],
    save_path: Path,
    *,
    with_demand_data: bool = True,
    with_lsa_data: bool = False,
    dpi: int = 100
) -> None

時間経過に伴うグラフの変化を描画する.

Source code in src/cosmica/experimental_packet_routing/routing_result_visualizer.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def plot_graph_over_time(
    self,
    time_index_from: int,
    time_index_to: int,
    time_index_step: int,
    dynamics_data: DynamicsData[ConstellationSatellite],
    save_path: Path,
    *,
    with_demand_data: bool = True,
    with_lsa_data: bool = False,
    dpi: int = 100,
) -> None:
    """時間経過に伴うグラフの変化を描画する."""
    # TODO(Takashima): time_index でなく timeを引数に取れるようにする
    n_fig = (time_index_to - time_index_from) // time_index_step
    ncols = 3
    nrows = (n_fig + ncols - 1) // ncols

    fig, axes = plt.subplots(
        nrows=nrows,
        ncols=ncols,
        figsize=(15 * ncols, 8 * nrows),
    )
    fig.subplots_adjust(wspace=0.1, hspace=0.1)

    for idx, time_index in enumerate(range(time_index_from, time_index_to, time_index_step)):
        row, col = divmod(idx, ncols)

        edges_with_demand_data = self._get_edges_with_demand_data(time_index) if with_demand_data else set()

        edges_with_lsa_data = self._get_edges_with_lsa_data(time_index) if with_lsa_data else set()

        title = f"Time: {np.datetime_as_string(self.time[time_index], unit='ms').split('T')[1]}"
        axes[row, col].set_title(title)

        draw_lat_lon_grid(ax=axes[row, col])
        draw_countries(ax=axes[row, col])
        draw_snapshot(
            graph=self.packet_routing_result.all_graphs_after_simulation[time_index],
            dynamics_data=dynamics_data[time_index],
            ax=axes[row, col],
            with_labels=False,
            focus_edges_list=[edges_with_demand_data, edges_with_lsa_data],
            focus_edges_label_list=["Demand data", "LSA data"],
        )

    # 保存
    fig.savefig(save_path, bbox_inches="tight", dpi=dpi)
    logger.info(f"Saved visualization: {save_path}")

SpaceTimeGraph dataclass

SpaceTimeGraph(
    time_for_snapshots: list[datetime64],
    graph_for_snapshots: list[Graph],
)

Base model for a space time graph.

Source code in src/cosmica/experimental_packet_routing/space_time_graph.py
27
28
29
30
31
32
33
def __init__(
    self,
    time_for_snapshots: list[np.datetime64],
    graph_for_snapshots: list[Graph],
) -> None:
    self.time_for_snapshots = time_for_snapshots
    self.graph_for_snapshots = graph_for_snapshots

graph_for_snapshots instance-attribute

graph_for_snapshots: list[Graph] = graph_for_snapshots

time_for_snapshots instance-attribute

time_for_snapshots: list[datetime64] = time_for_snapshots

get_space_time_graph_at_time

get_space_time_graph_at_time(time: datetime64) -> Graph

Get the space time graph at the specified time.

Source code in src/cosmica/experimental_packet_routing/space_time_graph.py
169
170
171
172
def get_space_time_graph_at_time(self, time: np.datetime64) -> Graph:
    """Get the space time graph at the specified time."""
    closest_snapshot_index: int = max(i for i, _time in enumerate(self.time_for_snapshots) if _time <= time)
    return self.graph_for_snapshots[closest_snapshot_index]

make_space_time_graph_from_graph classmethod

make_space_time_graph_from_graph(
    time: NDArray[datetime64],
    graphs: list[Graph],
    *,
    check_interval_time: timedelta64 | None = None
) -> Self

Update time and graph lists if the graph shape changes.

Source code in src/cosmica/experimental_packet_routing/space_time_graph.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@classmethod
def make_space_time_graph_from_graph(  # noqa: C901, PLR0912
    cls,
    time: npt.NDArray[np.datetime64],
    graphs: list[Graph],
    *,
    check_interval_time: np.timedelta64 | None = None,
) -> Self:
    """Update time and graph lists if the graph shape changes."""
    # ──────────────────────────────────────────────────────────────────────
    if check_interval_time is None:
        # Old behaviour (unchanged) ───────────────────────────────────────
        time_for_snapshots: list[np.datetime64] = [time[0]]
        graph_for_snapshots: list[Graph] = [copy.deepcopy(graphs[0])]
        for _time, _graph in tqdm(zip(time[1:], graphs[1:], strict=False), total=len(time) - 1):
            if not nx.is_isomorphic(_graph, graph_for_snapshots[-1]):
                time_for_snapshots.append(_time)
                graph_for_snapshots.append(copy.deepcopy(_graph))
        return cls(
            time_for_snapshots=time_for_snapshots,
            graph_for_snapshots=graph_for_snapshots,
        )

    # ──────────────────────────────────────────────────────────────────────
    # New t-second aggregation logic
    # ──────────────────────────────────────────────────────────────────────
    start, stop = time[0], time[-1]
    # Interval boundaries: t, 2t, …, ≤ stop
    boundaries: npt.NDArray[np.datetime64] = np.arange(
        start,
        stop + check_interval_time,
        check_interval_time,
    )

    time_for_snapshots = []
    graph_for_snapshots = []

    for boundary in tqdm(boundaries, total=len(boundaries)):
        # ── indices for past / future windows ────────────────────────────
        past_mask = (time > boundary - check_interval_time) & (time <= boundary)
        future_mask = (time > boundary) & (time < boundary + check_interval_time)

        if not past_mask.any() and not future_mask.any():
            continue  # no information

        past_idx = np.where(past_mask)[0]
        future_idx = np.where(future_mask)[0]

        # ── graph defined exactly at (or just before) boundary ───────────
        idx_boundary = max(0, int(np.searchsorted(time, boundary, side="right")) - 1)
        graph_at_boundary = copy.deepcopy(graphs[idx_boundary])

        # ── ① add every edge that ever appeared in the past window ───────
        edges_past = set().union(*(graphs[i].edges() for i in past_idx)) if past_idx.size else set()
        for u, v in edges_past:
            if not has_edge_bidirectional(graph_at_boundary, u, v):
                last_i = max(i for i in past_idx if has_edge_bidirectional(graphs[i], u, v))
                # Find the actual edge direction that exists
                if graphs[last_i].has_edge(u, v):
                    graph_at_boundary.add_edge(u, v, **graphs[last_i].edges[u, v])
                else:
                    graph_at_boundary.add_edge(u, v, **graphs[last_i].edges[v, u])

        # ── ② remove edges that will disappear in the coming interval ────
        if future_idx.size:
            stable_future = set.intersection(*(set(graphs[i].edges()) for i in future_idx))
            for u, v in list(graph_at_boundary.edges()):
                if (u, v) not in stable_future and (v, u) not in stable_future:
                    remove_edge_safe(graph_at_boundary, u, v)

        # ── ③ overwrite attributes with those at the mid-point (n+½)t ───
        half = check_interval_time // np.int64(2)
        mid_time = boundary + half  # (n+½)t
        mid_i = min(len(time) - 1, int(np.searchsorted(time, mid_time, side="left")))
        graph_at_midpoint = graphs[mid_i]

        for node, data in graph_at_midpoint.nodes(data=True):
            if graph_at_boundary.has_node(node):
                graph_at_boundary.nodes[node].update(data)

        for u, v, data in graph_at_midpoint.edges(data=True):
            if has_edge_bidirectional(graph_at_boundary, u, v):
                # Update the edge that actually exists in the boundary graph
                if graph_at_boundary.has_edge(u, v):
                    graph_at_boundary.edges[u, v].update(data)
                else:
                    graph_at_boundary.edges[v, u].update(data)

        # ── ④ recompute weights (edge delay + max node delay) ───────────
        for u, v, d in graph_at_boundary.edges(data=True):
            edge_delay = d.get("delay", 0)
            node_delay = max(graph_at_boundary.nodes[u].get("delay", 0), graph_at_boundary.nodes[v].get("delay", 0))
            graph_at_boundary.edges[u, v]["weight"] = edge_delay + node_delay
        for n in graph_at_boundary.nodes:
            graph_at_boundary.nodes[n]["weight"] = graph_at_boundary.nodes[n].get("delay", 0)

        # ── ⑤ store snapshot, skipping duplicates ───────────────────────
        if (not graph_for_snapshots) or (not nx.is_isomorphic(graph_at_boundary, graph_for_snapshots[-1])):
            time_for_snapshots.append(boundary)
            graph_for_snapshots.append(graph_at_boundary)

    return cls(
        time_for_snapshots=time_for_snapshots,
        graph_for_snapshots=graph_for_snapshots,
    )

update_space_time_graph_for_failure_edge

update_space_time_graph_for_failure_edge(
    failure_edge: tuple[Node, Node], update_time: datetime64
) -> Self

Update the space time graph for the failure edge.

Source code in src/cosmica/experimental_packet_routing/space_time_graph.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def update_space_time_graph_for_failure_edge(
    self,
    failure_edge: tuple[Node, Node],
    update_time: np.datetime64,
) -> Self:
    """Update the space time graph for the failure edge."""
    # Find the index of the closest time snapshot before or at graph_update_time
    closest_snapshot_index: int = max(i for i, time in enumerate(self.time_for_snapshots) if time <= update_time)

    # If the closest snapshot is the last snapshot, add a new snapshot
    if closest_snapshot_index == len(self.time_for_snapshots) - 1:
        graph: Graph = copy.deepcopy(self.graph_for_snapshots[closest_snapshot_index])
        if remove_edge_safe(graph, *failure_edge):
            self.time_for_snapshots.append(update_time)
            self.graph_for_snapshots.append(graph)

    else:
        for i in range(closest_snapshot_index + 1, len(self.time_for_snapshots)):
            graph = self.graph_for_snapshots[i]
            remove_edge_safe(graph, *failure_edge)

        graph = copy.deepcopy(self.graph_for_snapshots[closest_snapshot_index])
        if remove_edge_safe(graph, *failure_edge):
            self.time_for_snapshots.insert(closest_snapshot_index + 1, update_time)
            self.graph_for_snapshots.insert(closest_snapshot_index + 1, graph)

    return self