• a

    Alex Papanicolaou

    2 years ago
    Hi, we’re getting some mapped child tasks rerun despite success on the first run. Normally, our tasks look like this (logs cleaned up a bit):
    12:10:11 prefect.CloudTaskRunner	Task 'run_simulator[54]': Starting task run...
    12:10:11 prefect.run_simulator[54]	Starting simulator run
    12:10:11 prefect.run_simulator[54]	cusip_list [{'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}]
    12:10:11 prefect.run_simulator[54]	Loading model 'cf621134-8c36-446a-96b5-7ecde88a33e2'
    12:10:22 prefect.run_simulator[54]	Simulating pool {'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}
    12:10:31 prefect.run_simulator[54]	Number of replicates 6
    12:11:59 prefect.CloudTaskRunner	Task 'run_simulator[54]': finished task run for task with final state: 'Success'
    Here is an example though (and they don’t appear super common) where the task succeeded and then was later rerun. One thing you can note is that the model id is different. this is randomly generated (not a big deal) but along with the timestamp just confirms that this is repeated run not a duplicated log.
    11:55:34 prefect.CloudTaskRunner	Task 'run_simulator[6]': Starting task run...
    11:55:35 prefect.run_simulator[6]	Starting simulator run
    11:55:35 prefect.run_simulator[6]	cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
    11:55:35 prefect.run_simulator[6]	Loading model 'c410358f-4612-4aef-8f12-e9a3642711de'
    11:56:23 prefect.run_simulator[6]	Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
    11:56:36 prefect.run_simulator[6]	Number of replicates 3
    11:57:12 prefect.CloudTaskRunner	Task 'run_simulator[6]': finished task run for task with final state: 'Success'
    12:06:17 prefect.CloudTaskRunner	Task 'run_simulator[6]': Starting task run...
    12:06:17 prefect.run_simulator[6]	Starting simulator run
    12:06:17 prefect.run_simulator[6]	cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
    12:06:17 prefect.run_simulator[6]	Loading model '45322fce-d452-4340-9e06-e7bcc2775b84'
    12:06:27 prefect.run_simulator[6]	Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
    12:06:40 prefect.run_simulator[6]	Number of replicates 3
    12:07:15 prefect.CloudTaskRunner	Task 'run_simulator[6]': finished task run for task with final state: 'Success'
    a
    Chris White
    +1
    13 replies
    Copy to Clipboard
  • Johnny Bravo

    Johnny Bravo

    2 years ago
    Hello. Noob question here. I'm trying to download a couple of files from a website and want to do the jobs in parallel and limit the workers (so I can download max 2 files at a time). I've figured out how to limit the download files with this run
    flow.run(executor=DaskExecutor(
            cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
    Not sure if is the right way to do. Now I have this flow
    with Flow("Files downloader") as flow:
            files = get_files()
            downloaded_files = download_file.map(files)
            import_file.map(downloaded_files)
    The problem here is, after first two downloads, it goes to the next download task, instead of getting to import task. So, because I'm limited to 2 workers at a time, I need to prioritize
    import_file
    task over
    download_file
    task. Is there a better way to do this?
    Johnny Bravo
    Chris White
    12 replies
    Copy to Clipboard
  • h

    Hawkar Mahmod

    2 years ago
    Hi folks, It is possible to map a task that is using a
    LOOP
    signal. If I have a task that calls an API, and has to loop until it has no more data to fetch, can put pass each iteration of the call to a downstream task like so:
    results = transform_data.map(call_api())
    Inside
    call_api
    I am using a
    LOOP
    signal. But I cannot seem to access the loop results in the next task
    transform_data
    . My understanding was that when using this construct, each iteration of the task was it's own task.
  • c

    Chris Goddard

    2 years ago
    Hey folks - I'm having issues getting a docker agent to run flows locally. I'm using prefect cloud - I have confirmed that I can run a non-docker flow locally - and trigger it from the cloud UI (so
    backend
    is correctly set and the runner token is available as an environment variable). however, when I give the flow docker storage and spin up a docker agent, nothing happens when I try to trigger a flow from the ui - no errors, it's just like it's not receiving any instructions from prefect cloud. I am working in WSL2 (widows linux subsystem) - which creates all kinds of hellish networking issues (classic windows) - but I've confirmed that docker is working and I've run the docker image that was created for my flow and run the flow manually within the container by unpickling and running
    flow.run
    The on thing I thought it might be was failure to connect to the docker daemon (in case wsl ran it somewhere else) but I've confirmed that it's running at unix😕//var/run/docker.sock (I think earlier versions of WSL had an issue but I don't think that's what's going on). What else could I try? any suggestions?
    prefect diagnostics
    output:
    {
      "config_overrides": {
        "cloud": {
          "agent": {
            "auth_token": true
          }
        },
        "context": {
          "secrets": false
        }
      },
      "env_vars": [
        "PREFECT__CLOUD__AGENT__AUTH_TOKEN"
      ],
      "system_information": {
        "platform": "Linux-4.19.104-microsoft-standard-x86_64-with-glibc2.29",
        "prefect_version": "0.13.4",
        "python_version": "3.8.1"
      }
    }
    c
    nicholas
    30 replies
    Copy to Clipboard
  • m

    Minakshi

    2 years ago
    Is there a way to run multiple flows using prefect core? this doesn't seem to work. It only starts flow for the first dataset and then continues after the specified interval.
    for dataset in dataset_config['datasets']:
    print('starting flow for dataset' + dataset['dataset_name'])
    flow.run(dataset=dataset['dataset_name'])  # runs this flow on its schedule
    m
    j
    2 replies
    Copy to Clipboard
  • e

    Eric

    2 years ago
    Hello. Noob question here.1. Is there a way to use python to "quick run" a flow (create a flow-run) which has been registered onto Prefect server? I've seen the doc 'retrieve flow' but after retrieved the flow, flow.run() is executed by python. If I can trigger the "quick run" with python, I can see the log file on Prefect page instead of just on console. Thanks!
    e
    m
    3 replies
    Copy to Clipboard
  • s

    Scott Zelenka

    2 years ago
    I'm working on a Flow which calls a rate-limited API, which will occasionally through a 429 exception. Within the 429 exception, it will be explicit in how long it wants the client to wait until it reties, but we already have the retry logic specified in the
    @task
    decorator. So it gets stuck in a perpetual loop, because the server wants the client to wait longer than what was specified in the
    @task
    decorator. The Task Concurrency Limiting feature would reduce the frequency this happens, but would not catch all 429 exceptions. Sometimes this specific server gets overloaded by other traffic, and will dynamically rate-limit all traffic until it has scaled up to handle the additional traffic. I'm guessing that I'd need to write some custom retry logic within the
    task
    to handle the 429 exceptions, but curious if anyone else has a way to pipe the
    Retry-After
    from a 429 into the prefect engine's
    retry_delay
    parameter for similar rate-limited API calls?
    s
    emre
    3 replies
    Copy to Clipboard
  • m

    Marwan Sarieddine

    2 years ago
    Hi Folks, my flow run is getting stuck on scheduled and is not getting submitted - anyone else facing similar issues ? I am using prefect v0.13.4, prefect cloud, with a kubernetes agent, and a static dask setup -flow is stored using S3 Storage Here is my flow-run id: 68e72aba-f69b-4238-9478-82fa9bfa5706
    m
    Jeremiah
    9 replies
    Copy to Clipboard
  • j

    Jonas Hanfland

    2 years ago
    Hello guys, I'm trying to map over pandas dataframe groups, but I can't quite get it to work even though I am able to iterate over it in a python for-loop just fine. This is what I'm trying to do:
    some_task.map(df.groupby("id"))
    But it gives me:
    KeyError: 'Column not found: 0'
    Does anyone know if mapping over groups is supported and if yes, how? Thanks in advance
    j
    Chris White
    3 replies
    Copy to Clipboard
  • Vikram Iyer

    Vikram Iyer

    2 years ago
    Hi, I am getting the below error all of a sudden. The setup had been working fine for me before.
    ad-agent_1              |   File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
    ad-agent_1              |     raise HTTPError(http_error_msg, response=self)
    ad-agent_1              | requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://ad-prefect-apollo:4200/>
    The env variable on the
    agent
    container is:
    PREFECT__SERVER__ENDPOINT: <http://ad-prefect-apollo:4200>
    I am running all the required services inside docker containers using docker-compose file. Can anyone check and help out?
    Vikram Iyer
    Kyle Moon-Wright
    8 replies
    Copy to Clipboard