Aiden Price
12/12/2020, 12:23 AMapply_map()
to my flow I get this error; ValueError: Cycle found; flows must be acyclic!
Unfortunately the stack trace doesn't provide any hints as to where I've accidentally created a cycle in the graph.
I've done some comment-driven-debugging and found that the problem is related to a case statement in the function I'm attempting to apply_map()
.
I've also done flow.visualize()
and it does output a picture but because it's a big flow it's hard to spot the cycle in there.
Does anyone have any hints on how to better debug cycles in flow graphs?
Thanks in advance.nicholas
Aiden Price
12/12/2020, 12:39 AMapply_map
that isn't supplied as an argument.apply_map
ing over;
def token_flow(
key: Dict[str, Any],
az_policies: Dict[str, Any],
ftp_client: SFTPFileSystem,
als_path: str,
resource: str,
namespace: str,
eventhub: str,
duration: timedelta = timedelta(days=200),
) -> Dict[str, Any]:
"""A series of tasks intended for the `apply_map()` function."""
action = check_expiry(key, duration)
with case(action, "push"):
token1 = generate_token(namespace, eventhub, az_policies, az_policies, duration)
pushed = ftp_put(ftp_client, als_path, token1)
key_pushed = update_key(key, action, upstream_tasks=[pushed])
with case(action, "refresh"):
refresh = az_eventhub_renew_cmd(
resouce_group=resource,
namespace=namespace,
eventhub=eventhub,
policy=az_policies,
key=key,
)
refreshed = azcli(refresh)
key_refreshed = update_key(key, action, duration, upstream_tasks=[refreshed])
with case(action, "no-action"):
key_no_action = update_key(key, action)
return merge(key_pushed, key_refreshed, key_no_action)
keys = apply_map(
func=token_flow,
key=key_list,
az_policies=unmapped(policies),
ftp_client=unmapped(ftp_client),
als_path=unmapped(als_path),
resource=unmapped(resource),
namespace=unmapped(namespace),
eventhub=unmapped(eventhub),
duration=unmapped(duration),
)
az_policies
in the flow it works.nicholas
Aiden Price
12/12/2020, 12:52 AMnicholas