__remove_all_world_private_streams()
Remove all known streams that are flagged as not-public and are not owned by this agent.
__remove_all_world_related_agents()
Remove all world-related agents (masters and regular agents) from the agent's known lists.
accept_new_role(role: int)
Set the agent's role and optionally load a default behavior (private/world behaviour).
Args:
role: The integer role to assign to the agent (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER).
add_agent(peer_id: str, profile: unaiverse.networking.node.profile.NodeProfile) -> bool
Add a new known agent.
Args:
peer_id: The unique identifier of the peer.
profile: The NodeProfile object containing the peer's/agent's information.
Returns:
True if the agent was successfully added, False otherwise.
add_behav_wildcard(wildcard_from: str, wildcard_to: object)
Adds a wildcard mapping for the agent's behavior state machine.
Args:
wildcard_from: The string to be used as a wildcard.
wildcard_to: The object to replace the wildcard.
add_compatible_streams(peer_id: str, streams_in_profile: list[unaiverse.dataprops.DataProps], buffered: bool = False, add_all: bool = False, public: bool = True) -> bool
Add to the list of processor-compatible-streams those streams provided as arguments that are actually
found to be compatible with the processor (if they are pubsub, it also subscribes to them).
Args:
peer_id: The peer ID of the agent providing the streams.
streams_in_profile: A list of DataProps objects representing the streams from the peer's profile.
buffered: If True, the added streams will be of type BufferedDataStream.
add_all: If True, all streams from the profile are added, regardless of processor compatibility.
public: Consider public streams only (or private streams only).
Returns:
True if compatible streams were successfully added and subscribed to, False otherwise.
add_stream(stream: unaiverse.streams.DataStream, owned: bool = True, net_hash: str | None = None) -> dict[str, unaiverse.streams.DataStream]
Add a new stream to the set of known streams.
Args:
stream: The DataStream object to add.
owned: If True, the streams are considered owned by this agent.
net_hash: Optional network hash for the streams. If None, it will be generated.
Returns:
A dictionary containing the added stream and the possibly already present streams belonging to the same
group (stream name -> stream object).
add_streams(streams: list[unaiverse.streams.DataStream], owned: bool = True, net_hash: str | None = None) -> list[dict[str, unaiverse.streams.DataStream]]
Add a list of new streams to this environment.
Args:
streams: A list of DataStream objects to add.
owned: If True, the streams are considered owned by this agent.
net_hash: Optional network hash for the streams. If None, it will be generated for each.
Returns:
A list of dictionaries (it could be empty in case of issues), where each dictionary is what
is returned by add_stream().
agents_are_waiting()
Checks if there are any agents who have connected but have not yet been fully processed or added to the
agent's known lists. This indicates that new agents are waiting to be managed.
Returns:
True if there are waiting agents, False otherwise.
all_asked_finished()
Checks if all agents that were previously asked to perform a task (e.g., generate or learn) have sent a
completion confirmation. It compares the set of agents asked with the set of agents that have completed
the task.
Returns:
True if all agents are done, False otherwise.
all_engagements_completed()
Checks if all engagement requests that were sent have been confirmed. It returns True if there are no agents
remaining in the `_found_agents` list, implying all have been engaged with or discarded.
Returns:
True if all engagements are complete, False otherwise.
ask_gen(agent: str | None = None, u_hashes: list[str] | None = None, samples: int = 100, time: float = -1.0, timeout: float = -1.0, ask_uuid: str | None = None, ignore_uuid: bool = False)
Asking for generation.
Args:
agent: The ID of the agent to ask for generation, or a valid wildcard like "<valid_cmp>"
for a set of agents (if None the agents in self._engaged_agents will be considered).
u_hashes: A list of input stream hashes for generation. Defaults to None.
samples: The number of samples to generate. Defaults to 100.
time: The time duration for generation. Defaults to -1.
timeout: The timeout for the generation request. Defaults to -1.
ask_uuid: Specify the UUID of the action (if None - default -, it is randomly generated).
ignore_uuid: Force a None UUID instead of generating a random one.
Returns:
True if the generation request was successfully sent to at least one involved agent, False otherwise.
ask_learn(agent: str | None = None, u_hashes: list[str] | None = None, yhat_hashes: list[str] | None = None, samples: int = 100, time: float = -1.0, timeout: float = -1.0, ask_uuid: str | None = None, ignore_uuid: str | None = None)
Asking for learning to generate.
Args:
agent: The ID of the agent to ask for generation, or a valid wildcard like "<valid_cmp>"
for a set of agents (if None the agents in self._engaged_agents will be considered).
u_hashes: A list of input stream hashes for inference. Defaults to None.
yhat_hashes: A list of target stream hashes to be used for loss computation. Defaults to None.
samples: The number of samples to learn from. Defaults to 100.
time: The time duration for generation. Defaults to -1.
timeout: The timeout for the generation request. Defaults to -1.
ask_uuid: Specify the action UUID (default = None, i.e., it is automatically generated).
ignore_uuid: If Trie, the UUID is fully ignored (i.e, forced to None).
Returns:
True if the learning request was successfully sent to at least one involved agent, False otherwise.
ask_subscribe(agent: str | None = None, stream_hashes: list[str] | None = None, unsubscribe: bool = False)
Requests a remote agent or a group of agents to subscribe to or unsubscribe from a list of specified PubSub
streams. It normalizes the stream hashes and sends an action request containing the stream properties.
Args:
agent: The target agent's ID or a wildcard.
stream_hashes: A list of streams to subscribe to or unsubscribe from.
unsubscribe: A boolean to indicate if it's an unsubscription request.
Returns:
True if the request was sent to at least one agent, False otherwise.
augment_roles()
Augment the custom roles (role1, role2, etc.) with the default ones (public, world_master, etc.), generating
all the mixed roles (world_master~role1, world_master~role2, ...)
behave()
Behave in the current environment, calling the state-machines of the public and private networks.
behaving_in_world()
Checks if the agent's world-specific behavior state machine is currently active.
Returns:
True if the world behavior is active, False otherwise.
check_pref_stream(what: str = 'last')
Checks the position of the current preferred stream within the list. It can check if it's the first, last,
or if it has completed a full round, among other checks.
Args:
what: A string specifying the type of check to perform (e.g., 'first', 'last', 'last_round').
Returns:
True if the condition is met, False otherwise.
clear_world_related_data()
Destroy all the cached information that is about a world (useful when leaving a world).
compare_eval(cmp: str, thres: float, good_if_true: bool = True)
Compares the results of a previous evaluation to a given threshold or finds the best result among all
agents. It can check for minimum, maximum, or simple threshold-based comparisons, and it populates a list of
'valid' agents that passed the comparison.
Args:
cmp: The comparison operator (e.g., '<', '>', 'min').
thres: The threshold value for comparison.
good_if_true: A boolean to invert the pass/fail logic.
Returns:
True if at least one agent passed the comparison, False otherwise.
connect_by_role(role: str | list[str], filter_fcn: str | None = None, time: float = -1.0, timeout: float = -1.0)
Finds and attempts to connect with agents whose profiles match a specific role. It can be optionally
filtered by a custom function. It returns True if at least one valid agent is found.
Args:
role: The role or list of roles to search for.
filter_fcn: The name of an optional filter function.
time: The time duration for the action.
timeout: The action timeout.
Returns:
True if at least one agent is found and a connection request is made, False otherwise.
create_behav_files()
This method is called when building a world object. In your custom world-class, you can overload this method
and create the JSON files with the role-related behaviors, if you like. Recall that acting like this is not
mandatory at all: you can just manually create the JSON files, and this method will simply do nothing.
create_proc_output_streams(buffered: bool = False)
Creates the processor output streams based on the `proc_outputs` defined for the agent.
Args:
buffered: If True, the created streams will be of type BufferedDataStream.
deb(msg: str)
Print an error message to the console, if debug is enabled for this agent (it reuses the agent-out-function).
Args:
msg: The error message string to print.
disconnect_by_role(role: str | list[str])
Disconnects from all agents that match a specified role.
It finds the agents and calls the node's purge function on each.
Args:
role: A string or list of strings representing the role(s) of agents to disconnect from.
Returns:
Always True.
disconnected(agent: str | None = None, delay: float = -1.0)
Checks if a specific set of agents (by ID or wildcard) are no longer connected to the agent.
It returns False if any of the specified agents are still connected.
Args:
agent: The ID of the agent or a wildcard to check.
delay: The time (seconds) to be spent in the current state before actually considering this action.
Returns:
True if all involved agents are disconnected, False otherwise.
disengage_all()
Disengage all the previously engaged agents.
Returns:
True if the disengagement procedure was successfully executed, False otherwise.
do_gen(u_hashes: list[str] | None = None, samples: int = 100, time: float = -1.0, timeout: float = -1.0, _requester: str | list | None = None, _request_time: float = -1.0, _request_uuid: str | None = None, _completed: bool = False) -> bool
Generate a signal.
Args:
u_hashes: A list of input stream hashes for generation. Defaults to None.
samples: The number of samples to generate. Defaults to 100.
time: The max time duration for whole generation process. Defaults to -1.
timeout: The timeout for generation attempts: if calling the generate action fails for more than "timeout"
seconds, it is declared as complete. Defaults to -1.
_requester: The ID of the agent who requested generation (automatically set by the action calling routine).
_request_time: The time the generation was requested (automatically set by the action calling routine).
_request_uuid: The UUID of the generation request (automatically set by the action calling routine).
_completed: A boolean indicating if the generation is already completed (automatically set by the action
calling routine). This will tell that it is time to run a final procedure.
Returns:
True if the signal generation was successful, False otherwise.
do_learn(yhat_hashes: list[str] | None = None, u_hashes: list[str] | None = None, samples: int = 100, time: float = -1.0, timeout: float = -1.0, _requester: str | None = None, _request_time: float = -1.0, _request_uuid: str | None = None, _completed: bool = False) -> bool
Learn to generate a signal.
Args:
yhat_hashes: A list of target stream hashes to be used for loss computation. Defaults to None.
u_hashes: A list of input stream hashes for inference. Defaults to None.
samples: The number of samples to learn from. Defaults to 100.
time: The max time duration of the learning procedure. Defaults to -1.
timeout: The timeout for learning attempts: if calling the learning action fails for more than "timeout"
seconds, it is declared as complete. Defaults to -1.
_requester: The ID of the agent who requested learning (automatically set by the action calling routine).
_request_time: The time learning was requested (automatically set by the action calling routine).
_request_uuid: The UUID of the learning request (automatically set by the action calling routine).
_completed: A boolean indicating if the learning is already completed (automatically set by the action
calling routine). This will tell that it is time to run a final procedure.
Returns:
True if the signal generation was successful, False otherwise.
do_subscribe(stream_owners: list[str] | None = None, stream_props: list[str] | None = None, unsubscribe: bool = False, _requester: str | list | None = None, _request_time: float = -1.0)
Executes a subscription or unsubscription request received from another agent. It processes the stream
properties, adds or removes the streams from the agent's known streams, and handles the underlying PubSub topic
subscriptions.
Args:
stream_owners: A list of peer IDs who own the streams.
stream_props: A list of JSON-serialized stream properties.
unsubscribe: A boolean to indicate unsubscription.
_requester: The ID of the requesting agent.
_request_time: The time the request was made.
Returns:
True if the action is successful, False otherwise.
done_gen(_requester: str | None = None)
This is a way to get back the confirmation of a completed generation.
Args:
_requester: The ID of the agent who completed the generation. Defaults to None.
Returns:
True if the generation confirmation was successfully handled by this agent, False is something went wrong.
done_learn(_requester: str | None = None)
This is a way to get back the confirmation of a completed learning procedure.
Args:
_requester: The ID of the agent who completed the learning procedure. Defaults to None.
Returns:
True if the learning-complete confirmation was successfully handled by this agent, False otherwise.
done_subscribe(unsubscribe: bool = False, _requester: str | None = None)
Handles the confirmation that a subscription or unsubscription request has been completed by another agent.
It adds the requester to the set of agents that have completed their asked tasks.
Args:
unsubscribe: A boolean indicating if it was an unsubscription.
_requester: The ID of the agent who completed the task.
Returns:
Always True.
err(msg: str)
Print an error message to the console, if enabled at node level (it reuses the node-err-function).
Args:
msg: The error message string to print.
evaluate(stream_hash: str, how: str, steps: int = 100, re_offset: bool = False)
Evaluates the performance of agents that have completed a generation task. It compares the generated data
from each agent with a local stream (which can be a ground truth or reference stream) using a specified
comparison method.
Args:
stream_hash: The hash of the local stream to use for comparison.
how: The name of the comparison method to use.
steps: The number of steps to perform the evaluation.
re_offset: A boolean to indicate whether to re-offset the streams.
Returns:
True if the evaluation is successful, False otherwise.
evaluate_profile(role: int, profile: unaiverse.networking.node.profile.NodeProfile) -> bool
Evaluate if a given profile is valid for this agent based on its role. It helps in identifying and filtering
out invalid or 'cheating' profiles.
Args:
role: The expected integer role (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER) for the profile.
profile: The NodeProfile object to be evaluated.
Returns:
True if the profile is considered valid for the specified role, False otherwise.
find_agents(role: str | list[str], engage: bool = False)
Locally searches through the agent's known peers (world and public agents) to find agents with a specific
role. It populates the `_found_agents` set with the peer IDs of matching agents.
Args:
role: The role or list of roles to search for.
engage: If you want to force the found agents to be the ones that you are engaged with.
Returns:
True if at least one agent is found, False otherwise.
find_streams(peer_id: str, name_or_group: str | None = None) -> dict[str, dict[str, unaiverse.streams.DataStream]]
Find streams associated with a given peer ID and optionally by name or group.
Args:
peer_id: The peer ID of the (owner of the) streams to find.
name_or_group: Optional name or group of the streams to find.
Returns:
A dictionary where keys are network hashes and values are dictionaries of streams
(stream name to DataStream object) matching the criteria.
first_pref_stream()
Resets the internal pointer to the first stream in the list of preferred streams. This is useful for
restarting a playback or processing loop.
Returns:
True if the move is successful, False if the list is empty.
generate(input_net_hashes: list[str] | None = None, inputs: list[str | torch.Tensor | PIL.Image.Image] | None = None, first: bool = False, last: bool = False, ref_uuid: str | None = None) -> tuple[tuple[torch.Tensor] | None, int]
Generate new signals.
Args:
input_net_hashes: A list of network hashes to be considered as input streams (they will be sub-selected).
inputs: A list of data to be directly provided as input to the processor (if not None, input_net_hashes is
ignored).
first: If True, indicates this is the first generation call in a sequence.
last: If True, indicates this is the last generation call in a sequence.
ref_uuid: An optional UUID to match against input stream UUIDs (it can be None).
Returns:
A tuple containing:
- A tuple of torch.Tensor objects representing the generated output, or None if generation failed.
- An integer representing a data tag or status.
get_action_step()
Retrieve the current action step from the agent's private/world behavior.
Returns:
The current action step object from the HybridStateMachine's active action, or None if no action.
get_current_role(return_int: bool = False, ignore_base_role: bool = True) -> str | int | None
Returns the current role of the agent.
Args:
return_int: If True, returns the integer representation of the role.
ignore_base_role: If True, returns only the specific role part, not the base.
Returns:
The role as a string or integer, or None if the agent is not living in any worlds.
get_disengagement(disconnect_too: bool = False, _requester: str | None = None)
Get a disengagement request from an agent.
Args:
disconnect_too: Whether to disconnect the agent who sent the disengagement.
_requester: The ID of the agent requesting disengagement. Defaults to None.
Returns:
True if the disengagement request was successfully processed, False otherwise.
get_engagement(acceptable_role: str | None = None, sender_role: str | None = None, _requester: str | None = None)
Receive engagement from another agent whose authority is in the specified range.
Args:
acceptable_role: The role that the sender must have for engagement to be accepted. Defaults to None.
sender_role: The role of the agent sending the engagement request. Defaults to None.
_requester: The ID of the agent requesting engagement (automatically set by the action calling routine)
Returns:
True if the engagement was successfully received and confirmed, False otherwise.
get_last_streamed_data(agent_name: str)
Find streams associated with a given peer ID and optionally by name or group.
Args:
agent_name: The name of the agent.
Returns:
A list of data samples taken from all the known streams associated to the provided agent.
get_name() -> str
Returns the name of the agent or world from the node's profile.
Args:
None.
Returns:
The name of the agent or world.
get_peer_ids()
Retrieve the public and private peer IDs of the agent, from the underlying node's dynamic profile.
Returns:
A tuple containing the public peer ID and the private peer ID.
If either ID is not available, a placeholder string is returned <public_peer_id>, <private_peer_id>.
get_profile() -> unaiverse.networking.node.profile.NodeProfile
Returns the profile of the node hosting this agent/world.
Returns:
The NodeProfile of this node.
get_stream_sample(net_hash: str, sample_dict: dict[str, dict[str, torch.Tensor | None | int | str]])
Receive and process stream samples that were provided by another agent.
Args:
net_hash: The network hash identifying the source of the stream samples.
sample_dict: A dictionary where keys are stream names and values are dictionaries
containing 'data', 'data_tag', and 'data_uuid' for each sample.
Returns:
True if the stream samples were successfully processed and stored, False otherwise
(e.g., if the stream is unknown, not compatible, or data is None/stale).
got_engagement(_requester: str | None = None)
Confirm an engagement.
Args:
_requester: The ID of the agent confirming the engagement (automatically set by the action calling routine).
Returns:
True if the engagement was successfully confirmed, False otherwise.
in_world()
Check if the agent is currently operating within a 'world'.
Returns:
True if the agent is in a world, False otherwise.
is_last_action_step()
Check if the agent's current action (private/world behaviour) is on its last step.
Returns:
True if the current action was its last step, False otherwise. Returns None if there is no active action.
is_multi_steps_action()
Determines if the current action is a multistep action.
Returns:
True if the action is multistep, False otherwise.
learn_behave(state: int, last_action: int, prev_state: int)
A placeholder method for behavioral learning, intended to be implemented by child classes.
It receives state and action information to update a behavioral model.
Args:
state: The current state of the agent.
last_action: The last action taken.
prev_state: The previous state of the agent.
Returns:
An integer representing a new state, or similar feedback.
learn_generate(outputs: tuple[torch.Tensor], targets_net_hashes: list[str] | None) -> tuple[list[float] | None, list[float] | None]
Learn (i.e., update model params) by matching the given processor outputs with a set of targets (if any).
Args:
outputs: A tuple of torch.Tensor representing the outputs generated by the agent's processor.
targets_net_hashes: An optional list of network hashes identifying the streams
from which target data should be retrieved for learning.
If None, losses are evaluated without explicit targets.
Returns:
A tuple containing:
- A list of float values representing the individual loss values for each output.
Returns None if targets are specified but cannot be found.
- A list of integers representing the data tags of the given target streams (None if no targets were given).
load(where: str = 'output')
Load the agent's state from a specified location.
Args:
where: The directory path from which the agent's state should be loaded. Defaults to "output".
Returns:
The loaded AgentBasics object.
Raises:
AssertionError: If the specified load path does not exist.
IOError: If there is an issue with file operations (e.g., reading files).
load_and_refactor_action_file_and_behav_files(force_save: bool = False)
This method is called when building a world object. It loads the behavior files and refactors them.
It loads the action file agent.py. It checks consistency between the agent action files agent.py and the roles
in the behavior files.
Args:
force_save: Boolean to force the saving of the JSON and of a "pdf" folder with the PDFs of the state
machines.
merge_flat_data_stream_props()
Merge the labels of the descriptor components, across all streams, sharing them.
next_pref_stream()
Moves the internal pointer to the next stream in the list of preferred streams, which is often used for
playlist-like operations. It wraps around to the beginning if it reaches the end.
Returns:
True if the move is successful, False if the list is empty.
nop(message: str | None = None, delay: float = -1.0)
Do nothing.
Args:
message: An optional message to print. Defaults to None.
delay: The time (seconds) to be spent in the current state before actually considering this action.
Returns:
Always True.
out(msg: str)
Print a message to the console, if enabled at node level (it reuses the node-out-function).
Args:
msg: The message string to print.
proc_callback_inputs(inputs)
A callback method that saves the inputs to the processor right before execution.
Args:
inputs: The data inputs for the processor.
Returns:
The same inputs passed to the function.
proc_callback_outputs(outputs)
A callback method that saves the outputs from the processor right after execution.
Args:
outputs: The data outputs from the processor.
Returns:
The same outputs passed to the function.
received_some_asked_data(processing_fcn: str | None = None)
Checks if any of the agents that were previously asked for data (e.g., via `ask_gen`) have sent a stream
sample back. Optionally, it can process the received data with a specified function.
Args:
processing_fcn: The name of a function to process the received data.
Returns:
True if at least one data sample was received, False otherwise.
record(net_hash: str, samples: int = 100, time: float = -1.0, timeout: float = -1.0)
Records data from a specified stream into a new, owned `BufferedDataStream`. This is a multistep action
that captures a sequence of samples over time and then adds the new recorded stream to the agent's profile.
Args:
net_hash: The hash of the stream to record.
samples: The number of samples to record.
time: The time duration for recording.
timeout: The timeout for each recording attempt.
Returns:
True if a sample was successfully recorded, False otherwise.
remove_agent(peer_id: str)
Remove an agent.
Args:
peer_id: The unique identifier of the peer to remove.
remove_all_agents()
remove_all_streams(owned_too: bool = False)
Remove all not-owned streams.
Args:
owned_too: If True, also removes the owned streams of this agent (so also environmental and processor ones).
remove_peer_from_agent_status_attrs(peer_id)
remove_streams(peer_id: str, name: str | None = None, owned_too: bool = False)
Remove a known stream.
Args:
peer_id: The hash of each stream included the peer ID of the owner, so this is the peer ID associated with
the stream(s) to remove.
name: The optional name of the stream to remove. If None, all streams with this peer_id are removed.
owned_too: If True, also removes streams from the owned stream dict (so also environmental and processor).
reset_agent_status_attrs()
save(where: str = 'output')
Save the agent's state, including its processor and other attributes, to a specified location.
Args:
where: The directory path where the agent's state should be saved. Defaults to "output".
Returns:
The string "<SAVE_OK>" upon successful saving.
Raises:
IOError: If there is an issue with file operations (e.g., directory creation, writing files).
TypeError, ValueError, RuntimeError: For other potential issues during serialization or saving.
send_disengagement(send_disconnection_too: bool = False)
Ask for disengagement.
Args:
send_disconnection_too: Whether to send a disconnect-suggestion together with the disengagement.
Returns:
True if disengagement requests were successfully sent to at least one engaged agent, False otherwise.
send_engagement()
Offer engagement to the agents whose identifiers are in self._found_agents.
Returns:
True if engagement requests were successfully sent to at least one found agent, False otherwise.
send_profile_to_all()
Sends the agent's profile to all known agents.
send_stream_samples()
Collect and send stream samples from all owned streams to appropriate recipients.
set_next_action(agent: str | None, action: str, args: dict | None = None, ref_uuid: str | None = None)
Try to tell another agent what is the next action it should run.
Args:
agent: The ID of the agent to send the action to or a valid wildcard like "<valid_cmp>" for a set of agents
(if None the agents in self._engaged_agents will be considered).
action: The name of the action to be executed by the agent.
args: A dictionary of arguments for the action. Defaults to None.
ref_uuid: An optional UUID for referencing the action. Defaults to None.
Returns:
True if the action was successfully sent to the target agent or to at least one of the
involved agents (wildcard case).
set_node_info(clock: unaiverse.clock.Clock, conn: unaiverse.networking.node.connpool.ConnectionPools, profile: unaiverse.networking.node.profile.NodeProfile, out_fcn, ask_to_get_in_touch_fcn, purge_fcn, agents_waiting, print_level)
Set the required information from the node that hosts this agent.
Args:
clock: The global clock instance from the node.
conn: The connection pool manager from the node.
profile: The profile of the hosting node.
out_fcn: The function to use for general output messages.
ask_to_get_in_touch_fcn: The function to call to request getting in touch with another peer.
purge_fcn: The function to call to purge (kill/disconnect) a connection.
agents_waiting: Set of agents that connected to this node but have not been evaluated yet to be added.
print_level: The level of output printing verbosity (0, 1, 2).
set_pref_streams(net_hashes: list[str], repeat: int = 1)
Fills the agent's list of preferred streams (a playlist). It can repeat the playlist a specified number of
times and resolves user-provided stream hashes to their full network hashes.
Args:
net_hashes: A list of stream hashes to add to the playlist.
repeat: The number of times to repeat the playlist.
Returns:
Always True.
subscribe_to_pubsub_owned_streams() -> bool
Subscribes to all owned streams that are marked as PubSub.
Returns:
True if all subscriptions were successful, False otherwise.
suggest_badges_to_world(agent: str | None = None, score: float = -1.0, badge_type: str = 'completed', badge_description: str | None = None)
Suggests one or more badges to the world master for specific agents. This is typically used to reward agents
for completing tasks, such as for a competition. It sends a message with the badge details, including the score
and type, to the world master.
Args:
agent: The ID of the agent or a wildcard for which to suggest the badge.
score: The score associated with the badge.
badge_type: The type of badge (e.g., 'completed').
badge_description: An optional description for the badge.
Returns:
True if the badge suggestion was sent successfully, False otherwise.
suggest_role_to_world(agent: str | None, role: str)
Suggests a role change for one or more agents to the world master. It iterates through the involved agents,
checks if their current role differs from the suggested one, and sends a role suggestion message to the
world master.
Args:
agent: The ID of the agent or a wildcard to suggest the role for.
role: The new role to suggest (as a string).
Returns:
True if the suggestion was sent successfully, False otherwise.
update_streams_in_profile()
Updates the agent's profile with information about its owned (environmental and processor) streams.
user_stream_hash_to_net_hash(user_stream_hash: str) -> str | None
Converts a user-defined stream hash (peer_id:name_or_group) to a network hash
(peer_id::dm:... or peer_id::ps:name_or_group) by searching the known hashes in the known streams.
Args:
user_stream_hash: The user-defined stream hash string (peer_id:name_or_group).
Returns:
The corresponding network hash string (peer_id::dm:... or peer_id::ps:name_or_group), or None if not found.
wait_for_actions(agent: str, from_state: str, to_state: str, wait: bool)
Lock or unlock every action between a pair of states in the state machine of a target agent.
Args:
agent: The ID of the agent to send the action locking request to, or a valid wildcard like "<valid_cmp>"
for a set of agents (if None the agents in self._engaged_agents will be considered).
from_state: The starting state of the actions to be locked/unlocked.
to_state: The ending state of the actions to be locked/unlocked.
wait: A boolean indicating whether to wait for the actions to complete (wait == !ready).
Returns:
True if the request was successfully sent to at least one involved agent, False otherwise.
__ask_gen_or_learn(for_what: str, agent: str, u_hashes: list[str] | None, yhat_hashes: list[str] | None, samples: int = 100, time: float = -1.0, timeout: float = -1.0, ref_uuid: str | None = None)
A private helper method that encapsulates the logic for sending a 'do_gen' or 'do_learn' action request to
another agent. It handles the normalization of stream hashes, sets up recipients for direct messages, and adds
the target agent to the list of agents asked.
Args:
for_what: A string indicating whether to ask for 'gen' or 'learn'.
agent: The ID of the agent to send the request to.
u_hashes: A list of input stream hashes.
yhat_hashes: A list of target stream hashes (for learning).
samples: The number of samples.
time: The time duration.
timeout: The request timeout.
ref_uuid: The UUID for the request.
Returns:
True if the request was sent successfully, False otherwise.
__compare_streams(net_hash_a: str, net_hash_b: str, how: str = 'mse', steps: int = 100, re_offset: bool = False)
A private helper method that compares two buffered data streams based on a specified metric (e.g., MSE,
max accuracy). It handles stream compatibility checks, data retrieval, and the actual comparison, returning a
dissimilarity score.
Args:
net_hash_a: The network hash of the first stream.
net_hash_b: The network hash of the second stream.
how: The comparison metric ('mse', 'max', 'geqX').
steps: The number of samples to compare.
re_offset: A boolean to re-align stream tags before comparison.
Returns:
A tuple containing the dissimilarity score and a success flag (e.g., `(0.5, True)`).
__complete_do(do_what: str, peer_id_who_asked: str, all_hashes: list[str] | None, send_back_confirmation: bool = True)
A private helper method to be called at the end of a `do_gen` or `do_learn` action. It performs cleanup
tasks, such as clearing UUIDs on streams, and sends a confirmation message back to the requesting agent.
Args:
do_what: A string ('gen' or 'learn') indicating which task was completed.
peer_id_who_asked: The ID of the agent who requested the task.
all_hashes: A list of all stream hashes involved in the task.
send_back_confirmation: A boolean to indicate if a confirmation message should be sent.
Returns:
True if the completion process is successful, False otherwise.
__involved_agents(agent: str | None)
A private helper method that resolves an agent ID or a wildcard into a list of specific peer IDs.
It can resolve a single agent, a group of agents that passed a previous comparison (`<valid_cmp>`), or all
currently engaged agents.
Args:
agent: The agent ID or wildcard string.
Returns:
A list of peer IDs corresponding to the involved agents.
__process_streams(u_hashes: list[str] | None, yhat_hashes: list[str] | None, learn: bool = False, recipient: str | None = None, ref_uuid: str | None = None)
A private helper method that contains the core logic for processing data streams, either for generation or
learning. It reads input streams, passes them to the agent's processor, and handles the output streams.
It's designed to be called repeatedly by multistep actions like `do_gen` and `do_learn`.
Args:
u_hashes: A list of input stream hashes.
yhat_hashes: A list of target stream hashes (for learning).
learn: A boolean to indicate if the task is a learning task.
recipient: The ID of the agent to send data back to.
ref_uuid: The UUID for the request.
Returns:
True if the stream processing is successful, False otherwise.
__init__(args, kwargs)
__str__()
String representation of an agent.
Returns:
A formatted string describing the agent's current state and relationships.