https://prefect.io logo
a

Aiden Price

12/12/2020, 12:23 AM
Hi folks, when I try to add an
apply_map()
to my flow I get this error;
ValueError: Cycle found; flows must be acyclic!
Unfortunately the stack trace doesn't provide any hints as to where I've accidentally created a cycle in the graph. I've done some comment-driven-debugging and found that the problem is related to a case statement in the function I'm attempting to
apply_map()
. I've also done
flow.visualize()
and it does output a picture but because it's a big flow it's hard to spot the cycle in there. Does anyone have any hints on how to better debug cycles in flow graphs? Thanks in advance.
n

nicholas

12/12/2020, 12:37 AM
Hi @Aiden Price - sorry you're running into that! It's a little tough to debug without more code here, but is it possible the case function is built such that it could reference an instance of a task higher in the flow?
a

Aiden Price

12/12/2020, 12:39 AM
I'm still working on this, I've narrowed it down to one particular task result that I'm passing in to the function. I don't think I've called for anything within the
apply_map
that isn't supplied as an argument.
This is the function that I'm
apply_map
ing over;
Copy code
def token_flow(
    key: Dict[str, Any],
    az_policies: Dict[str, Any],
    ftp_client: SFTPFileSystem,
    als_path: str,
    resource: str,
    namespace: str,
    eventhub: str,
    duration: timedelta = timedelta(days=200),
) -> Dict[str, Any]:
    """A series of tasks intended for the `apply_map()` function."""
    action = check_expiry(key, duration)

    with case(action, "push"):
        token1 = generate_token(namespace, eventhub, az_policies, az_policies, duration)
        pushed = ftp_put(ftp_client, als_path, token1)
        key_pushed = update_key(key, action, upstream_tasks=[pushed])

    with case(action, "refresh"):
        refresh = az_eventhub_renew_cmd(
            resouce_group=resource,
            namespace=namespace,
            eventhub=eventhub,
            policy=az_policies,
            key=key,
        )
        refreshed = azcli(refresh)
        key_refreshed = update_key(key, action, duration, upstream_tasks=[refreshed])

    with case(action, "no-action"):
        key_no_action = update_key(key, action)

    return merge(key_pushed, key_refreshed, key_no_action)
And this is how it's called from inside my flow;
Copy code
keys = apply_map(
    func=token_flow,
    key=key_list,
    az_policies=unmapped(policies),
    ftp_client=unmapped(ftp_client),
    als_path=unmapped(als_path),
    resource=unmapped(resource),
    namespace=unmapped(namespace),
    eventhub=unmapped(eventhub),
    duration=unmapped(duration),
)
If I comment out the
az_policies
in the flow it works.
n

nicholas

12/12/2020, 12:52 AM
Nothing is jumping out at the moment as being wrong there but I'll keep thinking about this and look more in depth when i have some time
a

Aiden Price

12/12/2020, 12:52 AM
Thanks
n

nicholas

12/12/2020, 12:52 AM