Hui Zheng
01/21/2021, 1:15 AMGCS_result
. It happened to flow runs on different agents and k8e clusters. I wonder if there is a Prefect-cloud infrastructure issue?Sagun Garg
01/21/2021, 3:23 AMNoCredentialsError('Unable to locate credentials')Traceback
Reference Blog: https://makeitnew.io/prefect-a-modern-python-native-data-workflow-engine-7ece02ceb396
```
Unexpected error while reading from result handler: NoCredentialsError('Unable to locate credentials')Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 136, in read Bucket=self.bucket, Key=location, Fileobj=stream File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 678, in download_fileobj return future.result() File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result return self._coordinator.result() File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result raise self._exception
Loc Nguyen
01/21/2021, 3:26 AMsudo docker run -d prefecthq/prefect:latest
Checking docker logs, i have this:
tini (tini version 0.18.0)
Usage: tini [OPTIONS] PROGRAM -- [ARGS] | --version
Execute a program under the supervision of a valid init process (tini)
Command line options:
--version: Show version and exit.
-h: Show this help message and exit.
-s: Register as a process subreaper (requires Linux >= 3.4).
-p SIGNAL: Trigger SIGNAL when parent dies, e.g. "-p SIGKILL".
-v: Generate more verbose output. Repeat up to 3 times.
-w: Print a warning when processes are getting reaped.
-g: Send signals to the child's process group.
-e EXIT_CODE: Remap EXIT_CODE (from 0 to 255) to 0.
-l: Show license and exit.
Environment variables:
TINI_SUBREAPER: Register as a process subreaper (requires Linux >= 3.4).
TINI_VERBOSITY: Set the verbosity level (default: 1).
TINI_KILL_PROCESS_GROUP: Send signals to the child's process group.
It is quite unclear what is the issue 😞eamonn faherty
01/21/2021, 11:11 AMVitaly Shulgin
01/21/2021, 3:07 PMfetch
when prefect calls to run methodDilip Thiagarajan
01/21/2021, 3:48 PMRyan Kelly
01/21/2021, 4:49 PMAlex Rud
01/21/2021, 5:00 PMPedro Machado
01/21/2021, 5:44 PMLukasz Mentel
01/21/2021, 7:13 PMalex
01/21/2021, 7:20 PMdemo('flow1')
and demo('flow2').
I then changed the signature of DemoClass to take in another param, and reflected that change ie. tc = DemoClass(source_name, "new_param")
and ran demo('flow1')
again. This caused flow2 to start failing due it missing the new param, even though the flow was not directly modified.
I'm wondering what exactly happened here? If I'm using a venv, is that venv's python being used? If I pip install --upgrade
a package with breaking changes, would all my flows immediately start failing? Or if I deployed a flow after the upgrade?
class DemoClass:
def __init__(self, source_name):
self.source_name = source_name
# self.new_param = new_param (added later in signature)
pass
def run(self):
return self.source_name
from myprefect.demos.demo_class import DemoClass
from prefect import Flow, task
from prefect.schedules import IntervalSchedule
@task(log_stdout=True)
def initialize_and_run_class(source_name):
tc = DemoClass(source_name)
r = tc.run()
print(r)
return r
def demo(data_source):
with Flow(
f"flow - {data_source}",
schedule=IntervalSchedule(interval=datetime.timedelta(seconds=60)),
) as f:
tsk = initialize_and_run_class(data_source)
f.register(project_name="demo")
Lukasz Mentel
01/21/2021, 7:22 PMwith Flow("my-flow") as flow:
data_batch = fetch_dataframe(path)
inputs = fetch_series(data_batch, "series-name")
result = compute_vfr(inputs)
power = fetch_series(data_batch, "power")
cop = compute_cop(result, power)
Lukasz Mentel
01/21/2021, 7:25 PMcompute_cop
which has two dependencies, and when I try to replicate that with Flow.add_edge()
I can only specify a single key
to pass the data between tasks. Does anyone know how to get solve or get around this?Marco Palmeri
01/21/2021, 8:50 PMjack
01/21/2021, 9:39 PM@prefect.io
email address, or it says to contact an admin.Boris Gaganelov
01/21/2021, 9:44 PMGitLab
as a Storage - things seem to be working fine except for flows which actually import things from a GitLab subdirecroty.
So I keep getting ModuleNotFoundError("No module named 'tasks'")
.
My Storage repository file structure looks a bit like this:
root-of-repo/
├── flow_hello.py
└── tasks/
├── __init__.py
└── hello.py
And the in-code flow config is looking like so:
flow.storage = GitLab(
repo="XXXXX",
host="XXXXX",
path="flows/flow_hello.py",
secrets=["GITLAB_ACCESS_TOKEN"],
ref="master",
)
But it seems to be failing to resolve the import so I'm a little bit confused.
Am I somehow able to add it to the path of the agent despite it being a GitLab storage?Marwan Sarieddine
01/21/2021, 11:08 PMcontext
object.
Is there a difference between importing the context
globally vs from within a task ?
i.e.
import prefect
@task
def t():
prefect.context.get("...")
vs
@task
def t():
import prefect
prefect.context.get("...")
Also in a good portion of the docs the context
is used as prefect.context
- is that purely for readability reasons or is there a side effect for using from prefect import context
that one should be aware of ? (My current intuition/understanding is that there shouldn't be any difference here)Mark McDonald
01/21/2021, 11:53 PMJeremiah
01/22/2021, 1:05 AMJosh
01/22/2021, 5:05 AMAnze Kravanja
01/22/2021, 6:06 AMMatthew Blau
01/22/2021, 2:58 PMciaran
01/22/2021, 4:58 PMwith Flow("This is a test") as flow:
path = Parameter(name="path", default="blah"),
variable = Parameter(name="variable", default="blah"),
source_crs = Parameter(name="source_crs", default="epsg:4326")
dataset = open__file(path)
sigh it's Friday.ciaran
01/22/2021, 5:14 PMTuple
is not what I wantMatt Gordon
01/22/2021, 6:46 PMStartFlowRun
:
Task used to kick off a flow run using Prefect Core’s server or Prefect Cloud. If multiple versions of the flow are found, this task will kick off the most recent unarchived version.(Emphasis mine.) The problem I’m having is this: if I instantiate StartFlowRun with a
project_name
, I get:
Malformed response received from Cloud - please ensure that you have an API token properly configured.
If I instantiate it without specifying a project_name
, at runtime I see:
raise ValueError("Must provide a project name.")
raised from StartFlowRun
Matt Gordon
01/22/2021, 6:47 PMproject_name
is optional.Matt Gordon
01/22/2021, 6:49 PMfrom prefect import Flow, Task, task, Parameter
from prefect.tasks.prefect import StartFlowRun
@task
def first(k):
return {k: 10}
@task
def second(foo):
foo['quux'] = 20
foo['foo'] += 3
return foo
with Flow('f1') as f1:
p = Parameter('k', default='foo')
x = first(p)
with Flow('f2') as f2:
foo = Parameter('foo', default={'foo': 20})
y = second(foo)
ft1 = StartFlowRun(flow_name='f1',wait=True) # project_name here or not?
ft2 = StartFlowRun(flow_name='f2', wait=True)
with Flow('f3') as f3:
_r = ft1()
r = f3.run()
Matt Gordon
01/22/2021, 6:49 PMMatt Gordon
01/22/2021, 6:49 PMJames Phoenix
01/22/2021, 7:23 PMfrom prefect import Task
def learning(task, old_state, new_state):
if isinstance(new_state, state.Success):
print("Yay")
return new_state
class MyTask(Task):
def run(self):
return "hello"
task_1 = MyTask()
flow = Flow(name="my_flow", tasks=[task_1], state_handlers=[learning])
state = flow.run()
James Phoenix
01/22/2021, 7:23 PMfrom prefect import Task
def learning(task, old_state, new_state):
if isinstance(new_state, state.Success):
print("Yay")
return new_state
class MyTask(Task):
def run(self):
return "hello"
task_1 = MyTask()
flow = Flow(name="my_flow", tasks=[task_1], state_handlers=[learning])
state = flow.run()
Michael Adkins
01/22/2021, 7:31 PMfrom prefect.engine import state
and from prefect import Flow
this runs as expected[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Beginning Flow run for 'my_flow'
/Users/michaeladkins/prefect/core/src/prefect/engine/flow_runner.py:235: UserWarning: prefect.engine.executors.LocalExecutor has been moved to `prefect.executors.LocalExecutor`, please update your imports
executor = prefect.engine.get_default_executor_class()()
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Scheduled to Running
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Pending to Running
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Calling task.run() method...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Running to Success
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Finished task run for task with final state: 'Success'
[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Running to Success
Yay
state
variable which should contain the Prefect states with the result of your flowJames Phoenix
01/23/2021, 1:31 PM