https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • w

    Wai Kiat Tan

    06/11/2021, 4:51 PM
    i tried to login to prefect cloud but it keeps directing me to https://cloud.prefect.io/welcome/name-team. is it not accessible at the moment?
    n
    j
    • 3
    • 5
  • j

    Joe

    06/11/2021, 7:34 PM
    Hey there! Total noob question, but I'm having trouble finding what I'm looking for elsewhere. Hypothetical: I have five servers running Prefect agents (A, B, C, D, E) all connected to one Prefect Server (F). I write five different Python scripts to run on the different agent servers. Do those scripts need to get loaded onto the five different agent servers (A-E,) or do they all get loaded to the Prefect Server (F) which will then magically run each script on whatever agent machine I specify? Basically, will I have to create a deployment pipeline that drops scripts in one location, or five locations?
    z
    • 2
    • 3
  • l

    Lucas Fobian

    06/11/2021, 8:06 PM
    Hey guys. New to prefect, but already love it. I was wondering if there is a way to debug/view the intermediate result of a task during the flow.run() call? Some task in my pipelines messes up my final output and I would like to be able to quickly tab over the intermediate results of the tasks to see which is causing this behavior. Thank you!
    k
    • 2
    • 5
  • y

    YD

    06/11/2021, 9:16 PM
    How to make Prefect.io format file handler so that log entries will be just like stdout I am trying to have a Prefect.io log file, created by file handler, to look exactly like the log the is printed out by the Prefect.io logger, but I get just the message, without the metadata such as the log level. Any ideas on how to do that ?
    k
    • 2
    • 6
  • y

    YD

    06/11/2021, 11:23 PM
    Does anyone have a small code snippet, example for logging to a database?
  • b

    Ben Muller

    06/11/2021, 11:26 PM
    Hi Prefect devs, I've been working on a POC and got everything running with ECS and fargate. I have what might be a fairly unusual use case where I am scheduling flow runs (through the grqphql api) for a day in specific intervals to scrape certain data. These intervals are spread out and then as the event nears become closer (with flows running every 10 seconds as the event gets closer) (think of horse racing data as the race gets closer ). Anyway, with ECS scheduling close running jobs doesn't really work out as the start up time is for the task definition is ~ 45 seconds. A possible solution for this would be to give us the ability to run these jobs on Lambdas (they now support Docker image runs and we could keep the lambda warm for shorter start up times). Wondering if this is something that Prefect might consider in the future or is this just too much of a corner case and I'm missing the point on what Prefect should be used for?
    k
    s
    • 3
    • 11
  • t

    Thomas Nyegaard-Signori

    06/14/2021, 10:01 AM
    Hey Community, I am launching some kubernetes pods using the RunNamespacedJob task and I am trying to get the logs from these task pods to show up on the Prefect UI (currently running a Prefect server instance on an Azure VM with in-cluster agents). I end up seeing the
    Started following logs for <task pod>
    and then nothing more other than the completion and deletion log. Meanwhile, if I check the kubernetes job pod logs (
    kubectl logs -n my-namespace prefect-job-...
    I see the logs of the task pod showing up nicely, just not being sent back to the the Agent/Server. Anyone have a similar experience and know of that one flag I'm missing?
    k
    m
    m
    • 4
    • 21
  • j

    joshua mclellan

    06/14/2021, 3:17 PM
    hey is there a way to run a subset of a flow? ideally I could pick a single task in a flow and any dependent tasks for that specific flow would also be run
    z
    k
    • 3
    • 5
  • f

    Fina Silva-Santisteban

    06/14/2021, 6:30 PM
    Hi everyone, I’m having trouble switching from Docker storage (which runs successfully) to Github storage.
    flow.storage = GitHub(secrets=["GITHUB_ACCESS_TOKEN"],
                          repo="my_org/repo_name",
                          path="prefect_flows/flows/flow.py",
                          ref="trunk")
    The access token is saved in prefect cloud under the same name. During the flow run prefect can’t seem to be able to find the repo??
    INFO:Downloading flow from GitHub storage - repo: 'my_org/repo_name', path: 'prefect_flows/flows/flow.py', ref: 'trunk'
    
    ERROR: Repo 'my_org/repo_name' not found. Check that it exists (and is spelled correctly), and that you have configured the proper credentials for accessing it.
    
    ERROR: Failed to load and execute Flow's environment: UnknownObjectException(404, {'message': 'Not Found', 'documentation_url': '<https://docs.github.com/rest/reference/repos#get-a-repository>'}, {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Fri, 11 Jun 2021 22:46:06 GMT', 'content-type': 'application/json; charset=utf-8', 'x-github-media-type': 'github.v3; format=json', 'access-control-expose-headers': 'ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, Deprecation, Sunset', 'access-control-allow-origin': '*', 'strict-transport-security': 'max-age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0', 'referrer-policy': 'origin-when-cross-origin, strict-origin-when-cross-origin', 'content-security-policy': "default-src 'none'", 'vary': 'Accept-Encoding, Accept, X-Requested-With', 'content-encoding': 'gzip', 'x-ratelimit-limit': '60', 'x-ratelimit-remaining': '59', 'x-ratelimit-reset': '1623455165', 'x-ratelimit-resource': 'core', 'x-ratelimit-used': '1', 'content-length': '112', 'x-github-request-id': 'C202:57D9:E1FC5:1D69CC:60C3E7AD'})
    I’ve curled the github api endpoint
    repos/
    directly using the same auth token and I do get the repo’s information. Pls advise! 🙏
    k
    • 2
    • 14
  • j

    Jeremy Phelps

    06/14/2021, 7:26 PM
    Hi everyone. Is there any way to write a general wrapper for the
    Task.run
    method? I'd like to do something like this:
    class MyTask(prefect.tasks.core.function.FunctionTask):
        def run(*args, **kwargs):
            try:
                return self.super().run(*args, **kwargs)
            except BaseException as e:
                prefect.context.get('logger').error('Error on host {}: {}'.format(hostname, format_error(e)))
                raise # Or better yet, mark the task as Failed
    By default, Prefect catches, logs, and marks the task as failed for some errors, but for others, it puts
    str(e)
    in the "State Message" and throws away the stack trace, and for still others, it drops the error completely and the task appears to be "running". My wrapper would fix these problems. But you can't write the method above because there's a function called
    _validate_run_signature
    that expressly forbids it. So the only way to do it that I can think of would be to either wait for Python to get macros, or to add code generation to my deployment process (it would insert the try/except block into the body of every function that has a
    @task
    decorator). Is it really impossible to write this wrapper as an ordinary method?
    k
    m
    • 3
    • 16
  • v

    Verun Rahimtoola

    06/14/2021, 9:06 PM
    Error while deploying flow: ValidationError({'type': ['Unsupported value: UniversalRun']
    with prefect version
    0.14.21
    k
    • 2
    • 64
  • r

    Raúl Mansilla

    06/14/2021, 9:12 PM
    Hello community!! I´m trying to spin up a K8s cluster using the helm chart provided by prefetc…amd I´m trying to use an external postgreSQL database. The thing is_ What should I have to write inside postgresql.externalHostname? the hole postresql url with user/password/url/port/ddbb ? or just the url?
    m
    • 2
    • 25
  • p

    peter zhao

    06/14/2021, 9:43 PM
    💃 Just arrived!
    👋 3
    k
    • 2
    • 1
  • p

    peter zhao

    06/14/2021, 9:44 PM
    hello, i've got a problem with running the ETL tutorial
  • p

    peter zhao

    06/14/2021, 9:45 PM
    specifically i'm getting this error
    requests.exceptions.SSLError: HTTPSConnectionPool(host='<http://opensky-network.org|opensky-network.org>', port=443): Max retries exceeded with url: /api/states/all?lamin=37.15294209056667&lamax=40.75094679823332&lomin=-79.76136868215879&lomax=-75.13474242904122 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1129)')))
    k
    • 2
    • 18
  • s

    Scott Vermillion

    06/15/2021, 12:36 AM
    So in troubleshooting an error having to do with differences in python versions, I somehow broke what little I had working earlier today. Can anyone shed light on the following error:
    ERROR - <name> | Failed to query for flow run metadata
    
    Traceback (most recent call last):
      File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/agent/agent.py", line 324, in _submit_deploy_flow_run_jobs
        flow_runs = self._get_flow_run_metadata(flow_run_ids)
      File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/agent/agent.py", line 684, in _get_flow_run_metadata
        result = self.client.graphql(query)
      File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['flow_run', 0, 'id'], 'message': 'Cannot return null for non-nullable field flow_run.id.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I rebuilt my agent (was planning to anyway) but same error right away upon launching. I deleted and reinitialized my project. It doesn’t seem like I’m doing anything differently than before when the agent appeared happy but my issue was further down the line. Really mystified right now, but I’m a bit frazzled from working it so many hours…maybe I’m missing something obvious? (I’m also not the least bit experienced with Prefect - nor very much with AirFlow.)
    m
    m
    • 3
    • 13
  • p

    Prabin Mehta

    06/15/2021, 12:00 PM
    does anybody have any doc to follow for deploying prefect in kubernetes cluster?
    m
    k
    m
    • 4
    • 8
  • t

    Thomas Nyegaard-Signori

    06/15/2021, 12:50 PM
    Is anyone else unsuccessful in storing their flows as scripts using
    azure
    storage? I had a look at comparing the
    s3
    implementation (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/storage/s3.py#L159) to the
    azure
    one (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/storage/azure.py#L128), and it seems like the
    stored_as_script
    flag doesnt do much on the
    azure
    storage. Can anyone from the Prefect team confirm what I am seeing or am I on the wrong track?
    k
    • 2
    • 5
  • t

    Tadas

    06/15/2021, 1:30 PM
    Hey, I have a question. Is it better to store flow as pickled object in S3 or store_as_script? What are drawbacks of one or other way?
    k
    • 2
    • 4
  • m

    Milly gupta

    06/15/2021, 1:31 PM
    HI All, When I delete project with prefect client API client.delete_project, The project is not deleted immediately. Has anyone experienced this before?
    k
    • 2
    • 31
  • w

    Wai Kiat Tan

    06/15/2021, 3:32 PM
    hi all, if i have an upstream prefect task submitting a job to aws batch and a downstream task which executes upon the aws batch job completes. is it good to have the downstream task to keep checking whether the aws batch job completes by using boto3 sdk? or is there a more elegant way to achieve this?
    k
    • 2
    • 6
  • c

    ciaran

    06/15/2021, 3:48 PM
    Don't suppose anyone is running a
    prefect agent kubernetes
    in a custom Docker Image? I can get my image running on AWS Fargate with:
    command=[
        "prefect",
        "agent",
        "ecs",
        "start",
        "--agent-address",
        "http://:8080",
        "--cluster",
        cluster.cluster_arn,
        "--task-role-arn",
        ecs_task_role.role_arn,
    ],
    But on AKS it's a whole different story, I can't get it running 😭 Essentially it can't find prefect but I can run the container locally and invoke prefect
    k
    z
    • 3
    • 20
  • z

    Zach Schumacher

    06/15/2021, 5:04 PM
    hey guys - noticing some slow downs in prefect cloud today. Things that typically take <1 second (e.g. loading in flows to the flows view) are taking >5 in some cases. Just wanted to make you guys aware!
    n
    • 2
    • 2
  • z

    Zach Schumacher

    06/15/2021, 5:46 PM
    possibly a dumb question, but we just now are getting to the point of running things on schedules so I’m running into this. I want to schedule a flow so that it will run on a schedule in prefect cloud, however i still want it to run right away when flow.run is invoked. How can I do that? The schedule I’m using is below:
    flow.schedule = CronSchedule("30 12 * * *")
    k
    m
    • 3
    • 6
  • k

    Kathryn Klarich

    06/15/2021, 7:59 PM
    I have noticed some unexpected behavior using the prefect AWSClientWait task on long running batch jobs. Sometimes the AWSClientWait task will have failed, and the schematic and run details show the task to have failed, but the flow state is hangs in the running state for hours after. This particular task is part of a mapped flow, and other mapped flows failed on the same task, yet their flow state is failed. Any idea why this might occur? The trigger rule for the task that follows the failed task is also all_successful so it shouldn't be waiting to submit another task
    k
    • 2
    • 32
  • j

    Justin Liu

    06/15/2021, 8:25 PM
    Hey all, I’m trying to set up an ECS agent to run code stored in GitHub. Right now, I’m able to run basic code, but all of the parameters set in ECSRun() seem to be ignored, like the docker image to use and the cpu/memory. I’ve seen this in a couple other threads but was unable to follow the solutions (something about overriding?) I was able to choose an image location by setting it in the Run mode in prefect cloud UI, but I had to clear the cpu and memory options or else it would give a “string not valid” error. I’m also wondering if there’s a better way to specify the image location w/out the UI?
    k
    • 2
    • 40
  • m

    Matthew Blau

    06/15/2021, 8:30 PM
    Hello all, is there a standard way of migrating Prefect Server from one server to another? We will shortly be switching servers and would like to preserve historical run data and projects
    k
    • 2
    • 5
  • j

    Jason Prado

    06/15/2021, 10:57 PM
    Is there a way to add documentation to a Parameter that shows up in the UI?
    k
    • 2
    • 4
  • b

    Ben Muller

    06/16/2021, 7:21 AM
    Hi Prefect devs, is there a way in prefect to only increase version numbers of flows if there is a change detected? Thinking in terms of a ci/cd pipeline where all flows are registered as part of the deployment. Or is this logic something that is up to us as the users?
    t
    • 2
    • 2
  • t

    Timo

    06/16/2021, 7:21 AM
    Hey everyone, is it possible to break a flow if a mapped task runs on error without executing the following mapped tasks? If not, any suggestions how to solve this problem? Szenario: Executing a pipeline for a list of files (1...n) but if one runs on failure the flow should stopp immediately. I tried it with state handlers and the flow runs on error. But all instances of the mapped task get executed before the flow runs on error.
    k
    • 2
    • 13
Powered by Linen
Title
t

Timo

06/16/2021, 7:21 AM
Hey everyone, is it possible to break a flow if a mapped task runs on error without executing the following mapped tasks? If not, any suggestions how to solve this problem? Szenario: Executing a pipeline for a list of files (1...n) but if one runs on failure the flow should stopp immediately. I tried it with state handlers and the flow runs on error. But all instances of the mapped task get executed before the flow runs on error.
k

Kevin Kho

06/16/2021, 1:36 PM
Hey @Timo, I think the guarantees around stopping a mapped process are weak, especially if executed by a Dask cluster. Did you try the ENDRUN signal though?
t

Timo

06/16/2021, 2:17 PM
Hey @Kevin Kho, thanks for your response. I don't like to run it on a Dask cluster because the sequence of the files matters. I tried ENDRUN but can't get it to work.
from prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import State, Success


def stateh(obj, old, new: State):
    if new.is_failed():
        raise ENDRUN(new)
    return new


@task
def say_hello(name):
    print(f"Hello {name}")


@task(state_handlers=[stateh])
def forerror(para):
    if para == 2:
        raise FAIL("it's 2")
    else:
        print(para)
        return para + 1

with Flow("hello_flow") as flow:
    ls = [1, 2, 3]

    sh = say_hello("John")
    e = forerror.map(ls)


if __name__ == "__main__":
    flow.run()
k

Kevin Kho

06/16/2021, 2:18 PM
I think in this case, doing it in the state handler is too late for you so you need to raise
ENDRUN
instead of fail when
para == 2
t

Timo

06/16/2021, 2:21 PM
This did not work either 😞
from prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import Failed, State

def stateh(obj, old, new: State):
    if new.is_failed():
        raise ENDRUN(new)
    return new


@task
def say_hello(name):
    print(f"Hello {name}")


# @task(state_handlers=[stateh])
@task
def forerror(para):
    if para == 2:
        # raise FAIL("it's 2")
        state = Failed("it's 2")
        raise ENDRUN(state)
    else:
        print(para)
        return para + 1


with Flow("hello_flow") as flow:
    ls = [1, 2, 3]

    sh = say_hello("John")
    e = forerror.map(ls)


if __name__ == "__main__":
    flow.run()
[2021-06-16 16:20:11+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'hello_flow'
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Scheduled to Running
[2021-06-16 16:20:11+0200] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Pending to Running
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Calling task.run() method...
Hello John
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror': Handling state change from Pending to Mapped
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Finished task run for task with final state: 'Mapped'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Calling task.run() method...
1
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Calling task.run() method...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Running to Failed
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Finished task run for task with final state: 'Failed'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Calling task.run() method...
3
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-06-16 16:20:12+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Running to Failed
k

Kevin Kho

06/16/2021, 2:22 PM
Will give this a shot
Yeah I think there is not way to cancel like this when the task is mapped, because map was designed for parallel execution and it’s hard to cancel other parallel threads.
👍 1
t

Timo

06/17/2021, 5:39 AM
Thanks for the response. This totally makes sense. What i really need in this case is some kind of looping (https://docs.prefect.io/core/advanced_tutorials/task-looping.html) . But currently I have no idea how to loop over a bunch of tasks or the entire flow.
I found this way in another community thread. It works with a static input. But it doesn't work with a depending task which delivers the input (use
ls = get_list()
instead of
ls = [1,2,3]
). I receive a
Task is not iterable
error if I use the output of the get_list() task
from prefect import task, Flow
from prefect.engine.signals import FAIL

import prefect

LOGGER = prefect.context.get("logger")


@task
def say_hello(name):
    <http://LOGGER.info|LOGGER.info>(f"Hello {name}")
    return name


@task
def forerror(para):
    if para == 2:
        raise FAIL("it's 2")
    else:
        print(para)
        return para + 1


@task
def get_list():
    return [1, 2, 3]


with Flow("hello_flow") as flow:
    # ls = get_list()
    ls = [1, 2, 3]
    h_tasks = [say_hello("John") for x in ls]
    e_tasks = [forerror(para=x) for x in ls]

    for i in range(0, len(ls)):
        e_tasks[i].set_upstream(h_tasks[i])
        if i > 0:
            e_tasks[i].set_upstream(e_tasks[i - 1])

if __name__ == "__main__":
    flow.run()
k

Kevin Kho

06/17/2021, 1:53 PM
I think we can get this to work. I’ll work on this in a bit
This might be the easiest for your use case:
from prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
    <http://LOGGER.info|LOGGER.info>(f"Hello {name}")
    return name
@task
def forerror(para):
    if para == 2:
        raise FAIL("it's 2")
    else:
        print(para)
        return para + 1        
@task
def get_list():
    return [1, 2, 3]

@task
def helper(ls):
    for x in ls:
        say_hello.run("John")
        y = forerror.run(para=x)   
    return y

with Flow("hello_flow") as flow:
    ls = get_list()
    helper(ls)

flow.run()
❤️ 1
I can help you dive into Task Looping if you want. I’m aware this doesn’t give you monitoring for each of 1, 2, and 3. We can shape this into Task Looping if you want.
t

Timo

06/18/2021, 5:56 AM
Thank you very much. This is working as expected. Don't know that I could run tasks within a task by simply using
run()
(which totally makes sense because it's all python)... Downside of this approach is, that I can't monitor each "sub" task as you said. Therefore implementing Task Looping would be great. Could I reuse existing "offical" Prefect tasks within a Task Looping constuct? The example at the docs shows only custom tasks (with the @task decorator) (raising the LOOP signal). A another question I have: Could I use the map function with each or one of the constructs? E.g. I have a list of files which is splitted by days (
[[file1-day1.zip,file2-day1.zip,....,fileN-day1.zip], [file1-day2.zip,file2-day2.zip,....,fileN-day2.zip]]
. Now I like to iterate over the sequence but within the sequence I like to use map to extract all zip files with the Unzip-task. Currently I got
ValueError: Could not infer an active Flow context.
. As I discovered I could use
StartFlowRun
to start another flow which implements the mapping routine. But this could be not tested locally as
StartFlowRun
only works with cloud or server.
View count: 1