https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Samuel Hinton

    02/03/2021, 1:56 PM
    Also, it seems like the official prefect image is configured to display the ascii prefer splash screen (below) and then immediately terminate? The docs say to just use docker-compose, but point to the generic docker-compose website and not an example compose file. Is there an example compose file which has the server configured to keep the server online?
    _____ _____ ______ ______ ______ _____ _______
    | __ \| __ \| ____| ____| ____/ ____|__ __|
    | |__) | |__) | |__ | |__ | |__ | | | |
    | ___/| _ /| __| | __| | __|| | | |
    | | | | \ \| |____| | | |___| |____ | |
    |_| |_| \_\______|_| |______\_____| |_|
    
    Thanks for using Prefect!!!
    
    This is the official docker image for Prefect Core, intended for executing
    Prefect Flows. For more information, please see the docs:
    <https://docs.prefect.io/core/getting_started/installation.html#docker>
    j
    m
    • 3
    • 4
  • s

    Samuel Hinton

    02/03/2021, 2:46 PM
    Apologies for the chat spam, Im just wondering what else I am supposed to do after running
    prefect server start
    locally (which launches fine and gets me to the Welcome to PREFECT SERVER ascii splash screen). Navigating to localhost:8080 shows me connect to the server (and pull down the favicon+title) but the web page is empty (completely white). I can see its got content and scripts in the html, but theres this error and nothing is every made visible in the UI:
    ➕ 1
    p
    • 2
    • 2
  • m

    Mitchell Bregman

    02/03/2021, 3:00 PM
    Following up from yesterday.. situation is as follows: I could not add a new member to Cloud; they were previously a member of our team, but since they could not sign in (forgot password) I tried to remove them and add them back in. Now, an invite is in my team members inbox, when they click to join - it takes them to the sign in page.. understandable. Now, we cannot sign in because they do not remember the password and when you click forgot password, this email comes through… asking for sign-in help brings you back to the forgot password screen… just keeps going in a cycle. How can I get my team members password reset, given they are now a pending invite to our team?
    n
    c
    • 3
    • 3
  • s

    Samuel Hinton

    02/03/2021, 3:21 PM
    Hi all! I am running the server locally and initially when opening the UI Id be able to see the toolbar but nothing else. After some mindless googling, I found an instruction from here https://github.com/flavienbwk/prefect-docker-compose says I need to run
    prefect backend server
    prefect server create-tenant --name default --slug default
    And doing this does seem to fix the problem. Is there a way I can specify in the config options or somewhere in the docker-compose that I’d like to ensure this tenant exists on server startup?
    m
    j
    • 3
    • 5
  • v

    Verun Rahimtoola

    02/03/2021, 6:30 PM
    hi! is it possible to hook into prefect's flow registration logic to run some custom logic of our own, when a flow is being registered with the prefect server backend?
    m
    b
    • 3
    • 5
  • a

    Amit

    02/03/2021, 8:31 PM
    Do I need to add payment details to add read only users as well?
    j
    • 2
    • 1
  • c

    Carter Kwon

    02/03/2021, 10:04 PM
    Hello, I plan on running my flows using ECS and I’d like to have a separate IAM role for each task so it can have the minimum required access. I plan on using Terraform to provision the required AWS resources. I’d like to keep the task’s IAM related code coupled with the flow code so it can be used easily with our CI/CD pipeline. Does anyone have any ideas on how to best achieve this? It would be great if you could define the permissions needed in the task definition so it could be added in the flow code using ECSRun, but I don’t believe that is possible. It sounds like the only option is providing the arn of an existing role. Thanks!
    m
    b
    • 3
    • 4
  • b

    BK Lau

    02/03/2021, 11:51 PM
    Q: Are there a formal set of Prefect
    lifecycle
    methods or hooks or handlers for flow/tasks that user can override or register?? Something analogous or similar along this idea(https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/)
    c
    • 2
    • 1
  • s

    Sam Peck

    02/04/2021, 12:12 AM
    Hi gang, I’m having a confusing time with dealing with inconsistent behavior between how failed results interact with
    .map
    . When I run a flow like this:
    with Flow(f"test flow|) as flow:
      values = [1,2,3]
      task_1_results = task1.map(values)
      task_2_results = task2.map(task_1_results)
    Any task1 results that raised a
    FAIL
    signal don’t get run by task2, instead task2 aborts early with
    Finished task run for task with final state: 'TriggerFailed'
    However when I change this slightly to introduce a filter of some kind:
    my_filter = FilterTask()
    
    with Flow(f"test flow|) as flow:
      values = [1,2,3]
      task_1_results = my_filter(task1.map(values))
      task_2_results = task2.map(task_1_results)
    All task1 results get mapped onto task2, including those that failed. I get the sense that I’ve misunderstood the functional API and that’s what’s tripping me up, however I haven’t found anything in the docs that unlocks my intuition about why it would behave this way and how to modify that behavior. My first thought was to modify the filter to look out for failed results, but from stepping through in my debugger it doesn’t look like the trigger function gets any of that context.
    c
    • 2
    • 3
  • s

    Sanjay Patel

    02/04/2021, 12:13 AM
    Hi, I'm pretty sure my task graph executing on dask is running breadth first. I notice all my level tasks completing before a new one starts. I also saw that since V 0.12.0 that depth first was available. I'm using Prefect Core with dask, can anyone point me to where I can specify the dask executor to use depth first? Thanks so much!
    c
    • 2
    • 2
  • m

    Michael Wooley

    02/04/2021, 6:13 AM
    Hello, my question became somewhat lengthy so I've placed it in the attached snippet. The tl;dr is: Can I
    yield
    part of a task result for use downstream? If not, what is the best alternative? Or am I just thinking about this all wrong?
    q.md
    m
    • 2
    • 2
  • j

    Joël Luijmes

    02/04/2021, 10:41 AM
    In a flow I want to check a result, and only execute some downstream tasks if that condition is true. However, later on I want to merge that branch back in the main flow. E.g. https://docs.prefect.io/core/idioms/conditional.html#merging-branches-in-a-flow The example above deals with two branches, True/False, where I’m only interested in checking for True. I got the following POC, which works fine AFAIK, but I’d like to validate this is the rightt method, and if I’m not introducing undesired behavior. Edit: I got a better example which doesn’t work as I want to below. I always want to return to the main branch and thus run task_end. But this doesn’t work as I want, now everything is Skipped because the condition doesn’t work. Basically I want a task which continues on Skipped or Success I suppose? What is best practice here
    import prefect
    from prefect import task, Flow, case
    from prefect.tasks.control_flow import merge
    
    @task
    def check_condition():
        return True
    
    @task
    def log_output(val):
        <http://prefect.context.logger.info|prefect.context.logger.info>(val)
    
    with Flow("conditional-branches") as flow:
        cond = check_condition()
    
        with case(cond, False):
            task_true = log_output("Branch: true")
    
        task_end = log_output("Main branch end")
        task_end.set_upstream(merge(task_true))
    
    flow.visualize()
    flow.run()
    j
    • 2
    • 5
  • p

    Peter Roelants

    02/04/2021, 11:10 AM
    Hi Prefect, I'm trying to build up my understanding of how a flow gets registered and run, while what happens in the background. I tried to sketch a diagram (attached here). I'm curious, is there anything plainly wrong in this diagram? Is there another diagram of the Prefect Runtime Architecture that I can reference?
    m
    b
    • 3
    • 12
  • s

    Samuel Hinton

    02/04/2021, 11:21 AM
    Hi team! Are there any useful debugging methods for figuring out why agents arent executing flows? I can see the agent start nicely, and it appears in the Prefect dashboard as healthy, but the
    hello_flow
    example flow never executes. The agent startup seems fine:
    agent_1      | [2021-02-04 11:16:51,742] INFO - agent | Starting LocalAgent with labels ['753b3ccf1df5']
    agent_1      | [2021-02-04 11:16:51,742] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
    agent_1      | [2021-02-04 11:16:51,742] INFO - agent | Agent connecting to the Prefect API at <http://apollo:4200>
    And its still querying the server (see image), but it never actually does anything? I note that manually starting an agent seems to work, but including an agent in the docker-compose that launches all the other prefect services does not, and the debug output is just continually stating that no flows are found, disagreeing with the UI rather confusingly.
    agent_1      | [2021-02-04 11:34:48,152] DEBUG - agent | No flow runs found
    agent_1      | [2021-02-04 11:34:48,153] DEBUG - agent | Next query for flow runs in 10.0 seconds
    a
    • 2
    • 2
  • s

    Samuel Hinton

    02/04/2021, 12:46 PM
    Hi all! Im very confused about labelling our flows and hopefully someone has an easy solution. My flows never get run because prefect attaches a label which is my machine name (see image). This label is attached regardless of the executor (local, local dask and dask all do it). My agent is not my machine, and thus does not have this label. Does anyone know what the label represents so I can figure out whats going on.
    👀 1
    a
    g
    • 3
    • 7
  • m

    Michael Hadorn

    02/04/2021, 1:29 PM
    Hi :) What is the best solution to deploy a custom flow with some python dependencies? I know I need a remote storage (mostly git* or docker). I didn't got the key differences between DockerRun (run_config) or Docker (storage). For simple python dependencies: should I use the official prefect image and set the python_dependencies via the storage object? So that i don't even need an dockerfile. Is there an example anywhere? I think it's an absolut basic task... We use a single machine setup at the moment. Best regards Michael
    m
    • 2
    • 11
  • s

    Samuel Hinton

    02/04/2021, 1:46 PM
    @Greg Roche, @Amanda Wee - any other ideas on why flows might not execute that isnt a labeling issue? Now using an S3 bucket, I can confirm that the only label on a flow is “any”. There is an agent running and polling which has “any” label set. And yet the agent is always saying “No flow runs found”. Im pulling my hair out trying to get the hello world equivalent running here and I just dont understand where things might be going wrong :s
    s
    • 2
    • 13
  • a

    Arnoldas Bankauskas

    02/04/2021, 2:42 PM
    Hi I have some issues when I try so run Local server & UI I'm pretty shore that I don all correctly (because first i tested on local PC and all works fine than I create VM with WIN10 and done all the same steps) / i use Docker Descktop last version
    PS C:\WINDOWS\system32> prefect backend server
    Backend switched to server
    PS C:\WINDOWS\system32> prefect server start
    Traceback (most recent call last):
      File "site-packages\docker\api\client.py", line 259, in _raise_for_status
      File "site-packages\requests\models.py", line 941, in raise_for_status
    requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: <http+docker://localnpipe/version>
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "site-packages\docker\api\client.py", line 205, in _retrieve_server_version
      File "site-packages\docker\api\daemon.py", line 181, in version
      File "site-packages\docker\api\client.py", line 265, in _result
      File "site-packages\docker\api\client.py", line 261, in _raise_for_status
      File "site-packages\docker\errors.py", line 31, in create_api_error_from_http_exception
    docker.errors.APIError: 500 Server Error: Internal Server Error ("b'open \\\\.\\pipe\\docker_engine_linux: The system cannot find the file specified.'")
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "docker-compose", line 3, in <module>
      File "compose\cli\main.py", line 67, in main
      File "compose\cli\main.py", line 123, in perform_command
      File "compose\cli\command.py", line 69, in project_from_options
      File "compose\cli\command.py", line 132, in get_project
      File "compose\cli\docker_client.py", line 43, in get_client
      File "compose\cli\docker_client.py", line 170, in docker_client
      File "site-packages\docker\api\client.py", line 188, in __init__
      File "site-packages\docker\api\client.py", line 213, in _retrieve_server_version
    docker.errors.DockerException: Error while fetching server API version: 500 Server Error: Internal Server Error ("b'open \\\\.\\pipe\\docker_engine_linux: The system cannot find the file specified.'")
    [6744] Failed to execute script docker-compose
    Exception caught; killing services (press ctrl-C to force)
    maybe any ideas how to solve that ?
    m
    • 2
    • 1
  • k

    Karolína Bzdušek

    02/04/2021, 3:00 PM
    Asking here for better reach, as I see that this channel has more members (originally asked in perfect-ui channel): "Hi all, can you please help me to understand such a behaviour? When I run my flow in UI, the tasks are scheduled and staying in pending state. I used GitHub storage, running it locally (run_config=LocalRun(labels=["dev", "maps"])). Local agent has the very same labels. Originally, when storage was local as well, everything was running smoothly, when triggered run via UI. I looked at docs, looked through slack and haven't found anything useful."
    👍 1
    s
    s
    • 3
    • 6
  • m

    Marwan Sarieddine

    02/04/2021, 4:00 PM
    Hi folks, I am wondering if someone had to deploy a prefect agent and dask-kubernetes execution environment in an istio-enabled namespace The issue we are facing is that the dask-scheduler can no longer find the dask-worker pod to run the tasks so the flow is stuck in a Running state with no tasks executed
    a
    • 2
    • 5
  • c

    Cole Howard

    02/04/2021, 5:44 PM
    Hi all! Can anyone tell me what the icon means on the agent page in the UI? I'm unsure why my local agent has a globe icon but my ecs agent has nothing
    n
    • 2
    • 2
  • l

    Luis Gallegos

    02/04/2021, 5:48 PM
    Hi all! I want to know if there any way to free space of the database psql running with docker, over the UI or a clean way like graphql. And avoid make forced directly in the database. Something like...
    query {
      database {
        name: prefet_server
        table: flow_run
        action: flush
        where: date <= '2020-12-31'
    		
      }
    }
    m
    • 2
    • 3
  • s

    Sean Talia

    02/04/2021, 7:01 PM
    i have a fairly high level question about the
    config.toml
    – i'm just trying to figure out where in the development lifecycle the
    config.toml
    file is primarily used. I'm looking at this documentation on extra loggers and am wondering if I want to have different loggers in use when different flows are running, and i'm using
    DockerRun
    for my runconfig, is the most appropriate thing to do to just change up the
    PREFECT_LOGGING_EXTRA_LOGGERS
    env variable in my runconfig images?
    m
    • 2
    • 4
  • a

    Alvis Tang

    02/04/2021, 7:34 PM
    I just a maybe silly question. How can I run a flow with
    run_agent
    with parameters?
    m
    • 2
    • 4
  • v

    Vincent

    02/04/2021, 8:00 PM
    Has anyone seen an error like this. Don't really know where to start debugging for this.
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'type': 'system', 'errno': 'EHOSTUNREACH', 'code': 'EHOSTUNREACH'}}}])
    Traceback (most recent call last):
      File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 98, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/client/client.py", line 1503, in set_task_run_state
        result = self.graphql(
      File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'type': 'system', 'errno': 'EHOSTUNREACH', 'code': 'EHOSTUNREACH'}}}]
    a
    m
    +3
    • 6
    • 14
  • m

    Matt Denno

    02/04/2021, 8:25 PM
    Hello, I am having and issue that relates to serializing my results and setting checkpoint=False. I have a task that returns a Google ProtoBuff object, say like:
    @task(checkpoint=False)
    def fetch() -> ProtoBufObject:
        return fetch_proto_buff_object()
    In the past I have set checkpoint=False to avoid serialization errors with the ProtoBuff objects, but now, even with checkpoint=False I am getting the following when i try to register the flow.
    TypeError: can't pickle google.protobuf.pyext._message.MessageDescriptor objects
    To resolve this I tried to create a serializer for the ProtoBuff like so:
    class PBSerializer(Serializer):
        def serialize(self, value: Any) -> bytes:
            # transform a Python object into bytes
            return value.SerializeToString()
    
        def deserialize(self, value: bytes) -> Any:
            # recover a Python object from bytes
            ts = amanzi_pb2.TimeSeries
            return ts.ParseFromString(value)
    and called like:
    @task(checkpoint=False, result=LocalResult(serializer=PBSerializer()))
    def fetch() -> ProtoBufObject:
        return fetch_proto_buff_object()
    But I get the same error still. It doesn't seem like the result serializer is being used, but it is also not causing any errors. I have tried to debug and step through the code but can't figure out what is happening. Any help, guidance, ideas for how to resolve would be welcomed. I am on v 0.13.19
    m
    f
    j
    • 4
    • 12
  • w

    Wenli Wan

    02/04/2021, 9:00 PM
    hello, I have a question regarding logging the stdout from a Task and writing it to a log file per task. I was following this doc using the imperative API: https://docs.prefect.io/core/idioms/logging.html 1. inherit the class
    Task
    with 
    log_stdout=True
    2. add a
    FileHandeler
    to the logger of the Task with the following code:
    class MyTask(Task):
        def __init__(self, test_id, log_stdout=True, **kwargs):
            super().__init__(log_stdout=log_stdout, **kwargs)
            self.test_id = test_id
            fh = FileHandler(f"../logging_test/test_{test_id}.log")
            self.logger.addHandler(fh)
        def run(self):
            <http://self.logger.info|self.logger.info>("An info message.")
            print(f'test {self.test_id}')
    Then run it as the following:
    t1 = MyTask(1, name="test1", log_stdout=False)
    t2 = MyTask(2, name="test2")
    f = Flow("logging test")
    f.add_task(t1)
    f.add_task(t2)
    f.run()
    I found out the stdout from the Task has been redirected to the logger of the
    TaskRunner
    instead of the logger of
    MyTask
    :
    [2021-02-04 18:00:09-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'logging test'
    [2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
    [2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Flow 'logging test': Handling state change from Scheduled to Running
    [2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test1': Starting task run...
    [2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Handling state change from Pending to Running
    [2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Calling task.run() method...
    [2021-02-04 18:00:09-0500] INFO - prefect.test1 | An info message from test 1.
    test 1
    [2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Handling state change from Running to Success
    [2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test1': Finished task run for task with final state: 'Success'
    [2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test2': Starting task run...
    [2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Handling state change from Pending to Running
    [2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Calling task.run() method...
    [2021-02-04 18:00:09-0500] INFO - prefect.test2 | An info message from test 2.
    [2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | test 2
    [2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Handling state change from Running to Success
    [2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test2': Finished task run for task with final state: 'Success'
    [2021-02-04 18:00:09-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Flow 'logging test': Handling state change from Running to Success
    is there a way to have the Task stdout log in the Task logger? I'm trying to keep a log per task include the task stdout
    m
    • 2
    • 6
  • a

    Alvis Tang

    02/04/2021, 9:54 PM
    Hi a dumb question maybe. I'm trying to make my flow to run in parallel via
    run_agent
    but I can't make it work I tried to use
    flow.executor = LocalDaskExecutor()
    flow.run()
    & it can run in parallel, but not
    run_agent
    . any clue?
    m
    • 2
    • 10
  • c

    Chris Jordan

    02/04/2021, 10:10 PM
    Hey folks, I'm encountering an issue trying to get a flow to rerun itself when a condition is met. Here's the code for my flow:
    rerun_flow = StartFlowRun(
            flow_name="blast_import_flow",
            project_name="python_imports"
            )   
    spawn_time_series_import = StartFlowRun(
            project_name="python_imports", 
            flow_name="blast_metric_series_flow",
            wait=True)
    
    with Flow("blast_import_flow", 
            schedule=daily_schedule, 
            state_handlers=[cloud_only_slack_handler]
           ) as flow:
    
        data_object = get_blast_batch()
        check_api_result = check_api(data_object=data_object)
        transformed_data_objects = transform(data_object=check_api_result)
        imported = import_to_database(data_object_list=transformed_data_objects)
        blast_ids, record_length = push_records_to_summary_table(imported)
    
        # package ids into the right dict format
        packaged_ids = package_into_params.map(blast_id=blast_ids)
    
        spawn = spawn_time_series_import.map(
                parameters=packaged_ids)
        with case(record_length, 5): 
            rerun_flow()
    The ETL portions of the flow are working just fine, and the
    spawn_time_series_import
    is correctly creating its tasks. But
    rerun_flow
    isn't - I'm seeing this error:
    Unexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
        logger=self.logger,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 449, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 172, in run
        run_link = client.get_cloud_url("flow-run", flow_run_id)
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 887, in get_cloud_url
        tenant_slug = self.get_default_tenant_slug(as_user=as_user and using_cloud_api)
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 920, in get_default_tenant_slug
        res = self.graphql(query)
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 318, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
    I'm not sure what the difference is, aside from the wait. I figure I don't want a bunch of these telescoping and waiting on each other to finish.
    m
    • 2
    • 67
  • d

    Danny Vilela

    02/04/2021, 11:21 PM
    Hi all! I’m trying to use Prefect’s class-based tasks, and struggling to understand how to best make this work with my IDE’s function/variable annotations. For example, if I have this dummy flow:
    from prefect import Flow
    from prefect import Task
    
    
    class AddTask(Task):
        def run(self, x: int, y: int) -> int:
            return x + y
    
    
    with Flow(name="adding") as flow:
        adder: AddTask = AddTask()
        a: int = adder(x=1, y=2)
        b: int = adder(x=a, y=3)
        print(a, b)
    
    flow.run()
    It runs just fine, but I get two different “warning” highlights from my IDE (PyCharm): 1. On the
    def run(self, x: int, y: int) -> int
    line:
    Signature of method 'AddTask.run()' does not match signature of base method in class 'Task'
    2. On both assignment lines in the flow:
    Expected type 'int', got 'AddTask' instead
    I’d like to know what I’m doing that’s un-Prefect-like. Does Prefect work well with type annotations? I’m aware that, really, the
    x
    and
    y
    parameters to
    AddTask
    are actually converted to parameters (or a constant task?). But that’s maybe not as clear as annotating them as integers.
    m
    • 2
    • 7
Powered by Linen
Title
d

Danny Vilela

02/04/2021, 11:21 PM
Hi all! I’m trying to use Prefect’s class-based tasks, and struggling to understand how to best make this work with my IDE’s function/variable annotations. For example, if I have this dummy flow:
from prefect import Flow
from prefect import Task


class AddTask(Task):
    def run(self, x: int, y: int) -> int:
        return x + y


with Flow(name="adding") as flow:
    adder: AddTask = AddTask()
    a: int = adder(x=1, y=2)
    b: int = adder(x=a, y=3)
    print(a, b)

flow.run()
It runs just fine, but I get two different “warning” highlights from my IDE (PyCharm): 1. On the
def run(self, x: int, y: int) -> int
line:
Signature of method 'AddTask.run()' does not match signature of base method in class 'Task'
2. On both assignment lines in the flow:
Expected type 'int', got 'AddTask' instead
I’d like to know what I’m doing that’s un-Prefect-like. Does Prefect work well with type annotations? I’m aware that, really, the
x
and
y
parameters to
AddTask
are actually converted to parameters (or a constant task?). But that’s maybe not as clear as annotating them as integers.
m

Michael Adkins

02/04/2021, 11:24 PM
Hi @Danny Vilela -- to get the IDE to behave takes a lot of hacky signature updates (something like Pydantic does this) and I don't think we can easily explain to your IDE that this is the intent. I think that type-hints with Prefect are mostly for readability not enforcement with a type-checking system.
d

Danny Vilela

02/04/2021, 11:32 PM
@Michael Adkins Does that also apply to each task’s
run
method? It’s no real problem to silence mypy/the IDE on those lines (e.g.,
# noqa
), but just making sure 🙂
m

Michael Adkins

02/04/2021, 11:33 PM
What do you mean?
d

Danny Vilela

02/04/2021, 11:35 PM
Sorry, re: my first point:
Signature of method 'AddTask.run()' does not match signature of base method in class 'Task'
. For the second point you can just set
b: int = adder(x=a, y=2)  # noqa
. For the first, you’d have to do
def run(self, x: int, y: int) -> int:  # noqa
. Just checking that my understanding of how/when you need to silence the type checker is correct!
m

Michael Adkins

02/04/2021, 11:38 PM
Yeah because the base signature is
def run(self) -> None:
you'll get a mismatch error there. I'm not sure why we don't have it take
Any
and return
Any
but even then I think that pycharm may complain.
🙌 1
d

Danny Vilela

02/04/2021, 11:40 PM
Sounds good! Thanks for being so responsive @Michael Adkins 🙏
m

Michael Adkins

02/05/2021, 12:25 AM
You're welcome! If you want to open an issue for this, I imagine it's something we could consider better support for in the future but as of now it's just sugar.
View count: 9