• p

    Peter

    2 years ago
    We are working on integrating prefect with our existing applications and we've run into an issue with logging. Effectively, it looks like that a Prefect flow disables existing loggers when you call
    flow.run()
    . I've read through the documentation on logging which recommends you use prefect's utility function
    get_logger()
    to hook into a flow's logs, which works fine for creating a new application but would be a ton of work for us to change our existing applications that use python's standard logging practice - essentially defining
    logger = logging.getLogger(__name___)_
    at the top of the module. Unfortunately, when we call
    flow.run()
    with tasks that invoke our existing application that uses standard logging it looks like prefect disables those logs. Is there any way to tell prefect to log from existing applications that use python's standard
    logging.getLogger
    instead of prefect's
    get_logger
    utility?
  • p

    Peter

    2 years ago
    Also here's a toy script that illustrates my issue:
    p
    m
    +1
    5 replies
    Copy to Clipboard
  • s

    Sandeep Aggarwal

    2 years ago
    Sharing on channel as well if anyone else would like to chime in.
    s
    nicholas
    2 replies
    Copy to Clipboard
  • k

    Kai Weber

    2 years ago
    Hi can anybody give me a hint what the GraphQL query in HASURA to start my flow "Hello World" has to look like? This query shows my flows: query MyQuery { flow(distinct_on: name) { name id description } }
    k
    nicholas
    12 replies
    Copy to Clipboard
  • Simone Cittadini

    Simone Cittadini

    2 years ago
    Hi, I have a strange error in a docker-swarm deploy : if I run "curl http://apollo:4200/graphql" from the ui container I get ( I think correctly ) "GET query missing." but the UI itself under API status says "Couldn't connect http://apollo:4200/graphql/"
    Simone Cittadini
    nicholas
    4 replies
    Copy to Clipboard
  • Simone Cittadini

    Simone Cittadini

    2 years ago
    On a side note, I've noticed that if you call register on the same flow ( no code changes ) multiple times it creates multiple versions, is it considered a bug or that is how it works ? I can't let anyone register whatever they want from the laptop, so in my plan this is how it will work: • code is made in a branch of a common "all flows" repository • once approved is merged and the pipeline deploys the package in the agent, which proceeds to register everything If I dumbly register everything all the n flow already existing and untouched will bump their revision, not optimal ( not a problem if this is just how it works, I'll make the scan of code less dumb with some hash checking, but of course if it's just a matter of waiting a fix I'll gladly just wait 😁 )
    Simone Cittadini
    Jeremiah
    2 replies
    Copy to Clipboard
  • Matias Godoy

    Matias Godoy

    2 years ago
    Hi guys! I'm trying to run a flow using the GraphQL API in Prefect Cloud by executing:
    mutation {
      create_flow_run(input: {
        flow_run_name: "Test"
        version_group_id: "81261519-e91c-4fe3-bf85-10cc3a2d5016"
      }) {
        id
      }
    }
    But I get the following error:
    {
      "graphQLErrors": [
        {
          "path": [
            "create_flow_run"
          ],
          "message": "Version group 81261519-e91c-4fe3-bf85-10cc3a2d5016 has no unarchived flows.",
          "extensions": {
            "code": "INTERNAL_SERVER_ERROR"
          }
        }
      ],
      "networkError": null,
      "message": "GraphQL error: Version group 81261519-e91c-4fe3-bf85-10cc3a2d5016 has no unarchived flows."
    }
    I'm sure the group ID is correct, and that there is a version of a test flow I created. I can even run it manually. What am I doing wrong? Thanks!
    Matias Godoy
    Zachary Hughes
    5 replies
    Copy to Clipboard
  • h

    Howard Cornwell

    2 years ago
    Can anyone explain to me why this raises an error?
    import prefect
    import sqlalchemy as sql
    
    
    engine = sql.create_engine("<postgresql://tmp:tmp@localhost:5432/tmp>")
    tmp_schema = sql.MetaData()
    table = sql.Table("tmp", tmp_schema, sql.Column("tmp", sql.Integer))
    
    
    @prefect.task
    def create_table():
        tmp_schema.create_all(engine, tables=[table])
    
    
    with prefect.Flow("tmp") as flow:
        create_table()
    
    
    flow.storage = prefect.environments.storage.Docker(
        registry_url="localhost:5000",
        python_dependencies=["sqlalchemy"]
    )
    flow.register()
    Raises
    Traceback (most recent call last):
      File "issue.py", line 23, in <module>
        flow.register()
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1437, in register
        registered_flow = client.register(
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 649, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1299, in serialize
        storage = self.storage.build()  # type: Optional[Storage]
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 293, in build
        self._build_image(push=push)
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 329, in _build_image
        dockerfile_path = self.create_dockerfile_object(directory=tempdir)
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 436, in create_dockerfile_object
        cloudpickle.dump(self._flows[flow_name], f)
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 48, in dump
        CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj)
      File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 548, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle '_thread._local' object
    h
    Zachary Hughes
    +1
    5 replies
    Copy to Clipboard
  • m

    Miguel

    2 years ago
    Hi everyone I'm migrating an airflow dag to prefect and prefect server and trying to figure out how to backfill this flow. I've created a function that uses the
    client.create_flow_run
    to generate all the necessary runs with the correct context but, at least using the local agent, this creates way too many processes (one for each run, I guess). Is this this right approach or is there a way to limit the concurrency / number of processes spawned?
    m
    Zachary Hughes
    4 replies
    Copy to Clipboard
  • james.lamb

    james.lamb

    2 years ago
    Good morning from Chicago! I'd like to get confirmation on something I think I've discovered. Is this true?
    If you use
    KubernetesJobEnvironment
    as the execution environment for a flow, any
    executor
    you pass will be ignored and only the default executor will be used
    james.lamb
    Jim Crist-Harif
    15 replies
    Copy to Clipboard