https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    John Muehlhausen

    11/15/2021, 5:09 PM
    Anyone ever see all tasks complete but the flow just keeps going and going?
    k
    • 2
    • 3
  • c

    Charles Liu

    11/15/2021, 5:21 PM
    Just saw the Orion announcement, sounds awesome! Can't wait to dig into it.
    :marvin: 4
    🚀 3
    j
    • 2
    • 1
  • j

    John Jacoby

    11/15/2021, 5:35 PM
    Another question (I think combining other features with persistent results is breaking some things today): Any idea why I would be getting 'Object of type Parameter is not JSON serializable'?
    k
    • 2
    • 3
  • j

    John Jacoby

    11/15/2021, 5:35 PM
    The parameter is just a list of strings
  • j

    John Jacoby

    11/15/2021, 5:35 PM
    The error happens before hitting any other task
  • p

    Paulo Maia

    11/15/2021, 5:39 PM
    Hi everyone! How would I trigger "EmailTask" on failed/success state for the whole flow? My goal is to send the code error if it fails, and a success message if it doesnt. I've seen the state handlers for gmail but I want to use a custom SMTP. I succeeded on running a task for it but would like it to run only on those specific flow states. Thanks! https://docs.prefect.io/api/latest/tasks/notifications.html
    k
    • 2
    • 3
  • g

    Giovanni Giacco

    11/15/2021, 6:28 PM
    I can’t understand why I have a flow where task run more than one time. For example in the image there is a Parameter Task. Any suggestion?
    k
    • 2
    • 13
  • b

    Brad

    11/15/2021, 8:34 PM
    Hi team - quick one; I was just getting a friend up and running with prefect (orion) but he kept hitting
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file
    when trying to run orion start / reset db. We finally worked out it was because he didn’t have a
    ~/.prefect
    folder, but this was a pretty cryptic error message to receive
    k
    m
    • 3
    • 8
  • f

    Frank Oplinger

    11/15/2021, 9:39 PM
    Hello again, I am struggling to get logging working from within a task. The code I’m using is taken from the docs:
    @task
    def generate_set_sublists(inspection_id):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("An info message.")
        logger.warning("A warning message.")
    But, despite the task completing successfully, I can’t see those logs in the prefect cloud api. I do however see the ‘Starting task run…’ and ‘Finished task run for task with final state: Success’ logs that seem like the default. I am currently running my flow using the ECS Agent using Fargate. Thanks in advance.
    k
    • 2
    • 5
  • m

    Marwan Sarieddine

    11/15/2021, 10:27 PM
    Hi folks - is it possible to restart a flow run but with a different context or run config ?
    k
    • 2
    • 7
  • k

    kevin

    11/15/2021, 10:38 PM
    hey guys, I'm having a bit of a weird issue where a specific task is causing this error in Prefect:
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1518, in set_task_run_state
        result = self.graphql(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    It appears to me that I'm managing a JSON payload that Prefect considers "too large" however from what I see, the payload I'm sending is only on the scale of ~100KB which to me intuitively isn't that large. Is there some limitation in Prefect that I'm accidentally breaching or is there something else I should be investigating in my code that could be causing this
    m
    • 2
    • 32
  • m

    Matt Alhonte

    11/16/2021, 1:14 AM
    Having an issue with Parameters not making it to Flow runs - I think it's because I subclassed another task? I made one that subclassed
    StartFlowRun
    and one of the arguments to it comes from a parameter, and the Task Schematic doesn't even show it exists. ie,
    signal = Parameter(
            "signal", default="1a,1b,2a,2b,1c,1d,2d,1e,2e,1f,2f,1z,3a,3b"
        )
    
    nba1a_results = StartSkippableRun(
            signal=signal,
            nb_id="1a",
            flow_name="TestNBA1a",
            project_name=project_name,
            wait=True,
            parameters=task_parameters,
        )
    signal
    doesn't seem to register.
    k
    a
    • 3
    • 32
  • j

    John T

    11/16/2021, 2:46 AM
    Hello Prefect friends, I’ve come here to be a bother again. So, we are leveraging Pydantic objects as return values, but after running our first flow, we quickly realized that the
    PrefectResult
    requires the result to be JSON serializable. After digging into where
    PrefectResult
    was being used, we realized that we could change the serializer from
    JSONSerializer()
    to
    PickleSerializer()
    and have our pydantic objects move around seamlessly. We are however curious if changing the PrefectResult serializer to a pickling one can introduce some unforeseen consequences. Thanks in advance for any clarification!
    k
    • 2
    • 4
  • s

    Sergey

    11/16/2021, 8:20 AM
    Tell me, when executing the construction with case(list_check(new_events), True): ... with case(list_check(new_events), False): .... My task returns False, but for some reason everything that is in the case is not executed,writes skipped
    a
    k
    • 3
    • 9
  • c

    Chris Arderne

    11/16/2021, 10:32 AM
    Is there a way to tell in a task/flow if a Flow run is being run locally (with
    prefect run -p …
    ) or registered/from Cloud? Context in this issue.
    a
    k
    • 3
    • 5
  • h

    Huw Ringer

    11/16/2021, 10:55 AM
    Hi, I’m trying to use the Advanced Run Configuration settings in Prefect Cloud to create a key:value Context that I can then refer to in my code - but using
    prefect.context['my_run_parameter']
    in my Python code results in the following runtime error:
    Failed to load and execute Flow's environment: KeyError('my_run_parameter',)
    I’ve tried lots of variations on this syntax e.g.
    prefect.context.get('my_run_parameter')
    but nothing seems to work. Any advice/assistance you can provide would be gratefully received, thanks!
    a
    • 2
    • 14
  • a

    Amanda Wee

    11/16/2021, 11:02 AM
    I recall seeing Prefect Artifacts mentioned here recently, but I couldn't find it in the docs. Is it still a very experimental feature? Also, is it available with Prefect Server rather than being Prefect Cloud-only? Thanks!
    a
    • 2
    • 1
  • j

    John Shearer

    11/16/2021, 11:35 AM
    I'm running Prefect from tests - which works great. Currently Failed tasks log that failure at Info level, I wonder if Warning or Error might be more appropriate? It would certainly make it more obvious to me in logs/output here's an example
    a
    • 2
    • 6
  • d

    Dekel R

    11/16/2021, 2:32 PM
    Hey everyone, I wrote a pretty simple first flow - getting data of an api, saving it, then parsing is (as another task) and saving the parsed data. My code is using some Env vars and then it goes to the part where I actually build the flow (both are in the same py file). I also want to package the whole thing in a docker - please see some snippets  (the important section is “run_remotely”  and “data_flow”):
    init_logger()
    log = logging.getLogger(LOGGER_NAME)
    
    
    def data_flow():
        <http://log.info|log.info>('Running the data api workflow')
        with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1/something/xx",
                                                                 dockerfile="./Dockerfile", stored_as_script=True,
                                                                 path='path_to_the_current_file.py')) as flow:
            json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date, token=token)
            parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
        return flow
    
    
    def run_local():
        flow = data_flow()
        flow.run()
    
    
    def run_remotely():
        flow = data_flow()
        client = prefect.Client(api_key=prefect_token)
        client.register(flow, project_name=PREFECT_PROJECT_NAME)
    
    
    if __name__ == '__main__':
        try:
            start_date, end_date, token, prefect_token = get_workflow_env_vars()
            if prefect_token:
                run_remotely()
            else:
                run_local()
        except Exception as e:
            log.error(e, exc_info=True)
    Now i’m running the script and most of the build goes as planned but then I get this error (after “init_logger()” runs as expected):
    Traceback (most recent call last):
    File "/opt/prefect/healthcheck.py", line 150, in <module>
    flows = import_flow_from_script_check(flow_file_paths)
    File "/opt/prefect/healthcheck.py", line 63, in import_flow_from_script_check
    flows.append(extract_flow_from_file(file_path=flow_file_path))
    File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 104, in extract_flow_from_file
    raise ValueError("No flows found in file.")
    ValueError: No flows found in file.
    This is my Dockerfile:
    FROM python:3.8-slim
    
    WORKDIR /app
    
    COPY ./requirements.txt /app/requirements.txt
    RUN pip install -r requirements.txt
    
    ENV PYTHONPATH /app/
    
    COPY ./ /app/
    Which is in the same hierarchy as the file I posted at the top. Anyone had this issue? Any help will be appreciated. Thanks!
    👀 1
    a
    • 2
    • 24
  • j

    Jean-Baptiste Six

    11/16/2021, 4:31 PM
    Hello 🙂 I'm new with Prefect and I'd like to know if it's possible to combine it with async ? I need smth like this:
    @task
    async def mytask1():
        await do_smth()
    
    @task
    async def mytask2():
        await do_smth()
    
    async def my_flow():
        with Flow("Test") as flow:
            await mytask1()
            mytask2()
    
        flow.run()
    The goal is to be able to execute async functions, my flows will be synchronous between them
    k
    • 2
    • 4
  • d

    Dimas Gonzales

    11/16/2021, 4:47 PM
    Howdy team, I am trying to create an account on prefect cloud but I am getting an Http response code of 400. "Unable to JIT the user". I've tried with Google, GitHub and creating a email only account. Could someone point me to who I could reach out to on the prefect cloud team for help?
    k
    k
    • 3
    • 3
  • h

    Hugo Shi

    11/16/2021, 5:50 PM
    Is prefect cloud down? Trying to login I'm getting 404s in the UI
    :plus-one: 3
    c
    • 2
    • 1
  • h

    Hugo Kitano

    11/16/2021, 5:56 PM
    I have the same error, Hugo!
    h
    • 2
    • 2
  • j

    John Muehlhausen

    11/16/2021, 5:58 PM
    Last time I checked, running Prefect locally (vs Prefect Cloud) was not supported. Has that changed? Ideally we might want a managed appliance. Downtime like this is unacceptable. Our test agents are no longer processing flows.
    a
    • 2
    • 3
  • c

    Constantino Schillebeeckx

    11/16/2021, 6:51 PM
    Is
    StartFlowRun
    legacy code? This seems to suggest it is - however it's used through the current docs. Wondering if its getting phased out.
    k
    • 2
    • 3
  • k

    Kendal Burkhart

    11/16/2021, 7:33 PM
    Hi. New to Prefect, first time posting. I have a flow which pulls data from an API. The first call retrieves a list of json, then that list is mapped for processing. IE
    with flow("api") as flow:
       client = get_client(api_key, api_secret)
       main = get_data(client)
       mapped = get_mapped_data.map(prefect.unmapped(client), main)
    The issue is that sometimes the flow can run for more than an hour, at which time the access token for the API expires. (and 1 hour is max that issuer allows) I can catch the error in a mapped task and try doing a token refresh. But I'm unsure whether the new token will be available to mapped tasks, or if each mapped task will end up requesting it's own new token, or something else different entirely. I guess I'm asking what the best way is to address this problem. Thanks!
    k
    • 2
    • 2
  • c

    Constantino Schillebeeckx

    11/16/2021, 7:49 PM
    I'm confused about how the UI is sorting my flow names, I would have expected
    genesys__fof
    (notice the double underscore) to be listed first. Am I missing something?
    k
    c
    • 3
    • 2
  • m

    Matt Alhonte

    11/16/2021, 8:12 PM
    So the cloud UI site runs on GCP?
    k
    • 2
    • 3
  • b

    bral

    11/16/2021, 8:15 PM
    Hello! There is a posibility create new flow run from code:
    client.create_flow_run(
    flow_id="d7bfb996-b8fe-4055-8d43-2c9f82a1e3c7",
    flow_run_name="docs example hello-world"
    )
    But when run triggered by prefect the names are random. Or you can set manualy from UI flow_run_name. My tipical prefect script looks like :
    @task
    def task1():
    pass
    @task
    def task1():
    pass
    with Flow() as flow:
    task1()
    task2()
    ...
    client.register(flow, project_name='project')
    Is there way set run_flow_name by default in code? For example:
    name = f"name_{datetime.datetime.utc()}"
    k
    • 2
    • 2
  • a

    Aqib Fayyaz

    11/16/2021, 8:19 PM
    Is the prefect server uses same ui that prefect cloud does or there is some differences?
    k
    • 2
    • 1
Powered by Linen
Title
a

Aqib Fayyaz

11/16/2021, 8:19 PM
Is the prefect server uses same ui that prefect cloud does or there is some differences?
k

Kevin Kho

11/16/2021, 8:20 PM
In terms of code, it is largely the same. There are just some hidden or disabled features because there are services only in Cloud
✅ 1
View count: 1